[docs]classCosmosDBChatMessageHistory(BaseChatMessageHistory):"""Chat message history backed by Azure CosmosDB."""
[docs]def__init__(self,cosmos_endpoint:str,cosmos_database:str,cosmos_container:str,session_id:str,user_id:str,credential:Any=None,connection_string:Optional[str]=None,ttl:Optional[int]=None,cosmos_client_kwargs:Optional[dict]=None,):"""Initializes a new instance of the CosmosDBChatMessageHistory class. Make sure to call prepare_cosmos or use the context manager to make sure your database is ready. Either a credential or a connection string must be provided. :param cosmos_endpoint: The connection endpoint for the Azure Cosmos DB account. :param cosmos_database: The name of the database to use. :param cosmos_container: The name of the container to use. :param session_id: The session ID to use, can be overwritten while loading. :param user_id: The user ID to use, can be overwritten while loading. :param credential: The credential to use to authenticate to Azure Cosmos DB. :param connection_string: The connection string to use to authenticate. :param ttl: The time to live (in seconds) to use for documents in the container. :param cosmos_client_kwargs: Additional kwargs to pass to the CosmosClient. """self.cosmos_endpoint=cosmos_endpointself.cosmos_database=cosmos_databaseself.cosmos_container=cosmos_containerself.credential=credentialself.conn_string=connection_stringself.session_id=session_idself.user_id=user_idself.ttl=ttlself.messages:List[BaseMessage]=[]try:fromazure.cosmosimport(# pylint: disable=import-outside-toplevelCosmosClient,)exceptImportErrorasexc:raiseImportError("You must install the azure-cosmos package to use the CosmosDBChatMessageHistory."# noqa: E501"Please install it with `pip install azure-cosmos`.")fromexcifself.credential:self._client=CosmosClient(url=self.cosmos_endpoint,credential=self.credential,user_agent=USER_AGENT,**cosmos_client_kwargsor{},)elifself.conn_string:self._client=CosmosClient.from_connection_string(conn_str=self.conn_string,user_agent=USER_AGENT,**cosmos_client_kwargsor{},)else:raiseValueError("Either a connection string or a credential must be set.")self._container:Optional[ContainerProxy]=None
[docs]defprepare_cosmos(self)->None:"""Prepare the CosmosDB client. Use this function or the context manager to make sure your database is ready. """try:fromazure.cosmosimport(# pylint: disable=import-outside-toplevelPartitionKey,)exceptImportErrorasexc:raiseImportError("You must install the azure-cosmos package to use the CosmosDBChatMessageHistory."# noqa: E501"Please install it with `pip install azure-cosmos`.")fromexcdatabase=self._client.create_database_if_not_exists(self.cosmos_database)self._container=database.create_container_if_not_exists(self.cosmos_container,partition_key=PartitionKey("/user_id"),default_ttl=self.ttl,)self.load_messages()
[docs]defload_messages(self)->None:"""Retrieve the messages from Cosmos."""ifnotself._container:raiseValueError("Container not initialized")try:fromazure.cosmos.exceptionsimport(# pylint: disable=import-outside-toplevelCosmosHttpResponseError,)exceptImportErrorasexc:raiseImportError("You must install the azure-cosmos package to use the CosmosDBChatMessageHistory."# noqa: E501"Please install it with `pip install azure-cosmos`.")fromexctry:item=self._container.read_item(item=self.session_id,partition_key=self.user_id)exceptCosmosHttpResponseError:logger.info("no session found")returnif"messages"initemandlen(item["messages"])>0:self.messages=messages_from_dict(item["messages"])
[docs]defadd_message(self,message:BaseMessage)->None:"""Add a self-created message to the store."""self.messages.append(message)self.upsert_messages()
[docs]defupsert_messages(self)->None:"""Update the cosmosdb item."""ifnotself._container:raiseValueError("Container not initialized")self._container.upsert_item(body={"id":self.session_id,"user_id":self.user_id,"messages":messages_to_dict(self.messages),})
[docs]defclear(self)->None:"""Clear session memory from this memory and cosmos."""self.messages=[]ifself._container:self._container.delete_item(item=self.session_id,partition_key=self.user_id)