KafkaChatMessageHistory#

class langchain_community.chat_message_histories.kafka.KafkaChatMessageHistory(session_id: str, bootstrap_servers: str, ttl_ms: int = 604800000, replication_factor: int = 1, partition: int = 3)[source]#

Chat message history stored in Kafka.

Setup:

Install confluent-kafka-python.

pip install confluent_kafka
Instantiate:
from langchain_community.chat_message_histories import KafkaChatMessageHistory

history = KafkaChatMessageHistory(
    session_id="your_session_id",
    bootstrap_servers="host:port",
)
Add and retrieve messages:
# Add messages
history.add_messages([message1, message2, message3, ...])

# Retrieve messages
message_batch_0 = history.messages

# retrieve messages after message_batch_0
message_batch_1 = history.messages

# Reset to beginning and retrieve messages
messages_from_beginning = history.messages_from_beginning()

Retrieving messages is stateful. Internally, it uses Kafka consumer to read. The consumed offset is maintained persistently.

To retrieve messages, you can use the following methods: - messages:

continue consuming chat messages from last one.

  • messages_from_beginning:

    reset the consumer to the beginning of the chat history and return messages. Optional parameters: 1. max_message_count: maximum number of messages to return. 2. max_time_sec: maximum time in seconds to wait for messages.

  • messages_from_latest:

    reset to end of the chat history and try consuming messages. Optional parameters same as above.

  • messages_from_last_consumed:

    continuing from the last consumed message, similar to messages. Optional parameters same as above.

max_message_count and max_time_sec are used to avoid blocking indefinitely

when retrieving messages. As a result, the method to retrieve messages may not return all messages. Change max_message_count and max_time_sec to retrieve all history messages.

Parameters:

Attributes

messages

Retrieve the messages for the session, from Kafka topic continuously from last consumed message.

Methods

__init__(session_id,Β bootstrap_servers[,Β ...])

param session_id:

The ID for single chat session. It is used as Kafka topic name.

aadd_messages(messages)

Async add a list of messages.

aclear()

Async remove all messages from the store

add_ai_message(message)

Convenience method for adding an AI message string to the store.

add_message(message)

Add a Message object to the store.

add_messages(messages[,Β flush_timeout_seconds])

Add messages to the chat history by producing to the Kafka topic.

add_user_message(message)

Convenience method for adding a human message string to the store.

aget_messages()

Async version of getting messages.

clear()

Clear the chat history by deleting the Kafka topic.

close()

Release the resources.

messages_from_beginning([max_message_count,Β ...])

Retrieve messages from Kafka topic from the beginning.

messages_from_last_consumed([...])

Retrieve messages from Kafka topic from the last consumed message.

messages_from_latest([max_message_count,Β ...])

Reset to the end offset.

__init__(session_id: str, bootstrap_servers: str, ttl_ms: int = 604800000, replication_factor: int = 1, partition: int = 3)[source]#
Parameters:
async aadd_messages(messages: Sequence[BaseMessage]) β†’ None#

Async add a list of messages.

Parameters:

messages (Sequence[BaseMessage]) – A sequence of BaseMessage objects to store.

Return type:

None

async aclear() β†’ None#

Async remove all messages from the store

Return type:

None

add_ai_message(message: AIMessage | str) β†’ None#

Convenience method for adding an AI message string to the store.

Please note that this is a convenience method. Code should favor the bulk add_messages interface instead to save on round-trips to the underlying persistence layer.

This method may be deprecated in a future release.

Parameters:

message (AIMessage | str) – The AI message to add.

Return type:

None

add_message(message: BaseMessage) β†’ None#

Add a Message object to the store.

Parameters:

message (BaseMessage) – A BaseMessage object to store.

Raises:

NotImplementedError – If the sub-class has not implemented an efficient add_messages method.

Return type:

None

add_messages(messages: Sequence[BaseMessage], flush_timeout_seconds: float = 5.0) β†’ None[source]#

Add messages to the chat history by producing to the Kafka topic.

Parameters:
  • messages (Sequence[BaseMessage]) –

  • flush_timeout_seconds (float) –

Return type:

None

add_user_message(message: HumanMessage | str) β†’ None#

Convenience method for adding a human message string to the store.

Please note that this is a convenience method. Code should favor the bulk add_messages interface instead to save on round-trips to the underlying persistence layer.

This method may be deprecated in a future release.

Parameters:

message (HumanMessage | str) – The human message to add to the store.

Return type:

None

async aget_messages() β†’ List[BaseMessage]#

Async version of getting messages.

Can over-ride this method to provide an efficient async implementation.

In general, fetching messages may involve IO to the underlying persistence layer.

Return type:

List[BaseMessage]

clear() β†’ None[source]#

Clear the chat history by deleting the Kafka topic.

Return type:

None

close() β†’ None[source]#

Release the resources. Nothing to be released at this moment.

Return type:

None

messages_from_beginning(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) β†’ List[BaseMessage][source]#

Retrieve messages from Kafka topic from the beginning. This method resets the consumer to the beginning and consumes messages.

Args:

max_message_count: Maximum number of messages to consume. max_time_sec: Time limit in seconds to consume messages.

Returns:

List of messages.

Parameters:
  • max_message_count (int | None) –

  • max_time_sec (float | None) –

Return type:

List[BaseMessage]

messages_from_last_consumed(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) β†’ List[BaseMessage][source]#

Retrieve messages from Kafka topic from the last consumed message. Please note this method is stateful. Internally, it uses Kafka consumer to consume messages, and maintains the commit offset.

Args:

max_message_count: Maximum number of messages to consume. max_time_sec: Time limit in seconds to consume messages.

Returns:

List of messages.

Parameters:
  • max_message_count (int | None) –

  • max_time_sec (float | None) –

Return type:

List[BaseMessage]

messages_from_latest(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) β†’ List[BaseMessage][source]#

Reset to the end offset. Try to consume messages if available.

Parameters:
  • max_message_count (int | None) – Maximum number of messages to consume.

  • max_time_sec (float | None) – Time limit in seconds to consume messages.

Returns:

List of messages.

Return type:

List[BaseMessage]

Examples using KafkaChatMessageHistory