KafkaChatMessageHistory#
- class langchain_community.chat_message_histories.kafka.KafkaChatMessageHistory(session_id: str, bootstrap_servers: str, ttl_ms: int = 604800000, replication_factor: int = 1, partition: int = 3)[source]#
Chat message history stored in Kafka.
- Setup:
Install
confluent-kafka-python
.pip install confluent_kafka
- Instantiate:
from langchain_community.chat_message_histories import KafkaChatMessageHistory history = KafkaChatMessageHistory( session_id="your_session_id", bootstrap_servers="host:port", )
- Add and retrieve messages:
# Add messages history.add_messages([message1, message2, message3, ...]) # Retrieve messages message_batch_0 = history.messages # retrieve messages after message_batch_0 message_batch_1 = history.messages # Reset to beginning and retrieve messages messages_from_beginning = history.messages_from_beginning()
Retrieving messages is stateful. Internally, it uses Kafka consumer to read. The consumed offset is maintained persistently.
To retrieve messages, you can use the following methods: - messages:
continue consuming chat messages from last one.
- messages_from_beginning:
reset the consumer to the beginning of the chat history and return messages. Optional parameters: 1. max_message_count: maximum number of messages to return. 2. max_time_sec: maximum time in seconds to wait for messages.
- messages_from_latest:
reset to end of the chat history and try consuming messages. Optional parameters same as above.
- messages_from_last_consumed:
continuing from the last consumed message, similar to messages. Optional parameters same as above.
- max_message_count and max_time_sec are used to avoid blocking indefinitely
when retrieving messages. As a result, the method to retrieve messages may not return all messages. Change max_message_count and max_time_sec to retrieve all history messages.
- Parameters:
session_id (str) β The ID for single chat session. It is used as Kafka topic name.
bootstrap_servers (str) β Comma-separated host/port pairs to establish connection to Kafka cluster https://kafka.apache.org/documentation.html#adminclientconfigs_bootstrap.servers
ttl_ms (int) β Time-to-live (milliseconds) for automatic expiration of entries. Default 7 days. -1 for no expiration. It translates to https://kafka.apache.org/documentation.html#topicconfigs_retention.ms
replication_factor (int) β The replication factor for the topic. Default 1.
partition (int) β The number of partitions for the topic. Default 3.
Attributes
messages
Retrieve the messages for the session, from Kafka topic continuously from last consumed message.
Methods
__init__
(session_id,Β bootstrap_servers[,Β ...])- param session_id:
The ID for single chat session. It is used as Kafka topic name.
aadd_messages
(messages)Async add a list of messages.
aclear
()Async remove all messages from the store
add_ai_message
(message)Convenience method for adding an AI message string to the store.
add_message
(message)Add a Message object to the store.
add_messages
(messages[,Β flush_timeout_seconds])Add messages to the chat history by producing to the Kafka topic.
add_user_message
(message)Convenience method for adding a human message string to the store.
Async version of getting messages.
clear
()Clear the chat history by deleting the Kafka topic.
close
()Release the resources.
messages_from_beginning
([max_message_count,Β ...])Retrieve messages from Kafka topic from the beginning.
messages_from_last_consumed
([...])Retrieve messages from Kafka topic from the last consumed message.
messages_from_latest
([max_message_count,Β ...])Reset to the end offset.
- __init__(session_id: str, bootstrap_servers: str, ttl_ms: int = 604800000, replication_factor: int = 1, partition: int = 3)[source]#
- Parameters:
session_id (str) β The ID for single chat session. It is used as Kafka topic name.
bootstrap_servers (str) β Comma-separated host/port pairs to establish connection to Kafka cluster https://kafka.apache.org/documentation.html#adminclientconfigs_bootstrap.servers
ttl_ms (int) β Time-to-live (milliseconds) for automatic expiration of entries. Default 7 days. -1 for no expiration. It translates to https://kafka.apache.org/documentation.html#topicconfigs_retention.ms
replication_factor (int) β The replication factor for the topic. Default 1.
partition (int) β The number of partitions for the topic. Default 3.
- async aadd_messages(messages: Sequence[BaseMessage]) None #
Async add a list of messages.
- Parameters:
messages (Sequence[BaseMessage]) β A sequence of BaseMessage objects to store.
- Return type:
None
- async aclear() None #
Async remove all messages from the store
- Return type:
None
- add_ai_message(message: AIMessage | str) None #
Convenience method for adding an AI message string to the store.
Please note that this is a convenience method. Code should favor the bulk add_messages interface instead to save on round-trips to the underlying persistence layer.
This method may be deprecated in a future release.
- Parameters:
message (AIMessage | str) β The AI message to add.
- Return type:
None
- add_message(message: BaseMessage) None #
Add a Message object to the store.
- Parameters:
message (BaseMessage) β A BaseMessage object to store.
- Raises:
NotImplementedError β If the sub-class has not implemented an efficient add_messages method.
- Return type:
None
- add_messages(messages: Sequence[BaseMessage], flush_timeout_seconds: float = 5.0) None [source]#
Add messages to the chat history by producing to the Kafka topic.
- Parameters:
messages (Sequence[BaseMessage]) β
flush_timeout_seconds (float) β
- Return type:
None
- add_user_message(message: HumanMessage | str) None #
Convenience method for adding a human message string to the store.
Please note that this is a convenience method. Code should favor the bulk add_messages interface instead to save on round-trips to the underlying persistence layer.
This method may be deprecated in a future release.
- Parameters:
message (HumanMessage | str) β The human message to add to the store.
- Return type:
None
- async aget_messages() List[BaseMessage] #
Async version of getting messages.
Can over-ride this method to provide an efficient async implementation.
In general, fetching messages may involve IO to the underlying persistence layer.
- Return type:
List[BaseMessage]
- close() None [source]#
Release the resources. Nothing to be released at this moment.
- Return type:
None
- messages_from_beginning(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) List[BaseMessage] [source]#
Retrieve messages from Kafka topic from the beginning. This method resets the consumer to the beginning and consumes messages.
- Args:
max_message_count: Maximum number of messages to consume. max_time_sec: Time limit in seconds to consume messages.
- Returns:
List of messages.
- Parameters:
max_message_count (int | None) β
max_time_sec (float | None) β
- Return type:
List[BaseMessage]
- messages_from_last_consumed(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) List[BaseMessage] [source]#
Retrieve messages from Kafka topic from the last consumed message. Please note this method is stateful. Internally, it uses Kafka consumer to consume messages, and maintains the commit offset.
- Args:
max_message_count: Maximum number of messages to consume. max_time_sec: Time limit in seconds to consume messages.
- Returns:
List of messages.
- Parameters:
max_message_count (int | None) β
max_time_sec (float | None) β
- Return type:
List[BaseMessage]
- messages_from_latest(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) List[BaseMessage] [source]#
Reset to the end offset. Try to consume messages if available.
- Parameters:
max_message_count (int | None) β Maximum number of messages to consume.
max_time_sec (float | None) β Time limit in seconds to consume messages.
- Returns:
List of messages.
- Return type:
List[BaseMessage]
Examples using KafkaChatMessageHistory