[docs]classElasticsearchChatMessageHistory(BaseChatMessageHistory):"""Chat message history that stores history in Elasticsearch. Args: es_url: URL of the Elasticsearch instance to connect to. es_cloud_id: Cloud ID of the Elasticsearch instance to connect to. es_user: Username to use when connecting to Elasticsearch. es_password: Password to use when connecting to Elasticsearch. es_api_key: API key to use when connecting to Elasticsearch. es_connection: Optional pre-existing Elasticsearch connection. esnsure_ascii: Used to escape ASCII symbols in json.dumps. Defaults to True. index: Name of the index to use. session_id: Arbitrary key that is used to store the messages of a single chat session. """
[docs]def__init__(self,index:str,session_id:str,*,es_connection:Optional["Elasticsearch"]=None,es_url:Optional[str]=None,es_cloud_id:Optional[str]=None,es_user:Optional[str]=None,es_api_key:Optional[str]=None,es_password:Optional[str]=None,esnsure_ascii:Optional[bool]=True,):self.index:str=indexself.session_id:str=session_idself.ensure_ascii=esnsure_ascii# Initialize Elasticsearch client from passed client arg or connection infoifes_connectionisnotNone:self.client=es_connectionelifes_urlisnotNoneores_cloud_idisnotNone:try:self.client=create_elasticsearch_client(url=es_url,username=es_user,password=es_password,cloud_id=es_cloud_id,api_key=es_api_key,)exceptExceptionaserr:logger.error(f"Error connecting to Elasticsearch: {err}")raiseerrelse:raiseValueError("""Either provide a pre-existing Elasticsearch connection, \ or valid credentials for creating a new connection.""")self.client=with_user_agent_header(self.client,"langchain-py-ms")ifself.client.indices.exists(index=index):logger.debug(f"Chat history index {index} already exists, skipping creation.")else:logger.debug(f"Creating index {index} for storing chat history.")self.client.indices.create(index=index,mappings={"properties":{"session_id":{"type":"keyword"},"created_at":{"type":"date"},"history":{"type":"text"},}},)
@propertydefmessages(self)->List[BaseMessage]:# type: ignore[override]"""Retrieve the messages from Elasticsearch"""try:fromelasticsearchimportApiErrorresult=self.client.search(index=self.index,query={"term":{"session_id":self.session_id}},sort="created_at:asc",)exceptApiErroraserr:logger.error(f"Could not retrieve messages from Elasticsearch: {err}")raiseerrifresultandlen(result["hits"]["hits"])>0:items=[json.loads(document["_source"]["history"])fordocumentinresult["hits"]["hits"]]else:items=[]returnmessages_from_dict(items)
[docs]defadd_message(self,message:BaseMessage)->None:"""Add a message to the chat session in Elasticsearch"""try:fromelasticsearchimportApiErrorself.client.index(index=self.index,document={"session_id":self.session_id,"created_at":round(time()*1000),"history":json.dumps(message_to_dict(message),ensure_ascii=bool(self.ensure_ascii),),},refresh=True,)exceptApiErroraserr:logger.error(f"Could not add message to Elasticsearch: {err}")raiseerr
[docs]defclear(self)->None:"""Clear session memory in Elasticsearch"""try:fromelasticsearchimportApiErrorself.client.delete_by_query(index=self.index,query={"term":{"session_id":self.session_id}},refresh=True,)exceptApiErroraserr:logger.error(f"Could not clear session memory in Elasticsearch: {err}")raiseerr