"""Astra DB - based chat message history, based on astrapy."""
from __future__ import annotations
import json
import time
from operator import itemgetter
from typing import TYPE_CHECKING, Sequence
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
BaseMessage,
message_to_dict,
messages_from_dict,
)
from typing_extensions import override
from langchain_astradb.utils.astradb import (
COMPONENT_NAME_CHATMESSAGEHISTORY,
SetupMode,
_AstraDBCollectionEnvironment,
)
if TYPE_CHECKING:
from astrapy.authentication import TokenProvider
from astrapy.db import AstraDB, AsyncAstraDB
DEFAULT_COLLECTION_NAME = "langchain_message_store"
[docs]
class AstraDBChatMessageHistory(BaseChatMessageHistory):
[docs]
def __init__(
self,
*,
session_id: str,
collection_name: str = DEFAULT_COLLECTION_NAME,
token: str | TokenProvider | None = None,
api_endpoint: str | None = None,
namespace: str | None = None,
environment: str | None = None,
setup_mode: SetupMode = SetupMode.SYNC,
pre_delete_collection: bool = False,
ext_callers: list[tuple[str | None, str | None] | str | None] | None = None,
astra_db_client: AstraDB | None = None,
async_astra_db_client: AsyncAstraDB | None = None,
) -> None:
"""Chat message history that stores history in Astra DB.
Args:
session_id: arbitrary key that is used to store the messages
of a single chat session.
collection_name: name of the Astra DB collection to create/use.
token: API token for Astra DB usage, either in the form of a string
or a subclass of `astrapy.authentication.TokenProvider`.
If not provided, the environment variable
ASTRA_DB_APPLICATION_TOKEN is inspected.
api_endpoint: full URL to the API endpoint, such as
`https://<DB-ID>-us-east1.apps.astra.datastax.com`. If not provided,
the environment variable ASTRA_DB_API_ENDPOINT is inspected.
namespace: namespace (aka keyspace) where the collection is created.
If not provided, the environment variable ASTRA_DB_KEYSPACE is
inspected. Defaults to the database's "default namespace".
environment: a string specifying the environment of the target Data API.
If omitted, defaults to "prod" (Astra DB production).
Other values are in `astrapy.constants.Environment` enum class.
setup_mode: mode used to create the Astra DB collection (SYNC, ASYNC or
OFF).
pre_delete_collection: whether to delete the collection
before creating it. If False and the collection already exists,
the collection will be used as is.
ext_callers: one or more caller identities to identify Data API calls
in the User-Agent header. This is a list of (name, version) pairs,
or just strings if no version info is provided, which, if supplied,
becomes the leading part of the User-Agent string in all API requests
related to this component.
astra_db_client:
*DEPRECATED starting from version 0.3.5.*
*Please use 'token', 'api_endpoint' and optionally 'environment'.*
you can pass an already-created 'astrapy.db.AstraDB' instance
(alternatively to 'token', 'api_endpoint' and 'environment').
async_astra_db_client:
*DEPRECATED starting from version 0.3.5.*
*Please use 'token', 'api_endpoint' and optionally 'environment'.*
you can pass an already-created 'astrapy.db.AsyncAstraDB' instance
(alternatively to 'token', 'api_endpoint' and 'environment').
"""
self.astra_env = _AstraDBCollectionEnvironment(
collection_name=collection_name,
token=token,
api_endpoint=api_endpoint,
keyspace=namespace,
environment=environment,
setup_mode=setup_mode,
pre_delete_collection=pre_delete_collection,
ext_callers=ext_callers,
component_name=COMPONENT_NAME_CHATMESSAGEHISTORY,
astra_db_client=astra_db_client,
async_astra_db_client=async_astra_db_client,
)
self.collection = self.astra_env.collection
self.async_collection = self.astra_env.async_collection
self.session_id = session_id
self.collection_name = collection_name
@property
def messages(self) -> list[BaseMessage]:
"""Retrieve all session messages from DB."""
self.astra_env.ensure_db_setup()
message_blobs = [
doc["body_blob"]
for doc in sorted(
self.collection.find(
filter={
"session_id": self.session_id,
},
projection={
"timestamp": True,
"body_blob": True,
},
),
key=itemgetter("timestamp"),
)
]
items = [json.loads(message_blob) for message_blob in message_blobs]
return messages_from_dict(items)
@messages.setter
def messages(self, _: list[BaseMessage]) -> None:
msg = "Use add_messages instead"
raise NotImplementedError(msg)
[docs]
@override
async def aget_messages(self) -> list[BaseMessage]:
await self.astra_env.aensure_db_setup()
docs = self.async_collection.find(
filter={
"session_id": self.session_id,
},
projection={
"timestamp": True,
"body_blob": True,
},
)
sorted_docs = sorted(
[doc async for doc in docs],
key=itemgetter("timestamp"),
)
message_blobs = [doc["body_blob"] for doc in sorted_docs]
items = [json.loads(message_blob) for message_blob in message_blobs]
return messages_from_dict(items)
[docs]
@override
def add_messages(self, messages: Sequence[BaseMessage]) -> None:
self.astra_env.ensure_db_setup()
docs = [
{
"timestamp": time.time(),
"session_id": self.session_id,
"body_blob": json.dumps(message_to_dict(message)),
}
for message in messages
]
self.collection.insert_many(docs)
[docs]
@override
async def aadd_messages(self, messages: Sequence[BaseMessage]) -> None:
await self.astra_env.aensure_db_setup()
docs = [
{
"timestamp": time.time(),
"session_id": self.session_id,
"body_blob": json.dumps(message_to_dict(message)),
}
for message in messages
]
await self.async_collection.insert_many(docs)
[docs]
@override
def clear(self) -> None:
self.astra_env.ensure_db_setup()
self.collection.delete_many(filter={"session_id": self.session_id})
[docs]
@override
async def aclear(self) -> None:
await self.astra_env.aensure_db_setup()
await self.async_collection.delete_many(filter={"session_id": self.session_id})