[docs]@deprecated("0.0.27",alternative="Use langchain-elasticsearch package",pending=True)classElasticsearchChatMessageHistory(BaseChatMessageHistory):"""Chat message history that stores history in Elasticsearch. Args: es_url: URL of the Elasticsearch instance to connect to. es_cloud_id: Cloud ID of the Elasticsearch instance to connect to. es_user: Username to use when connecting to Elasticsearch. es_password: Password to use when connecting to Elasticsearch. es_api_key: API key to use when connecting to Elasticsearch. es_connection: Optional pre-existing Elasticsearch connection. ensure_ascii: Used to escape ASCII symbols in json.dumps. Defaults to True. index: Name of the index to use. session_id: Arbitrary key that is used to store the messages of a single chat session. """
[docs]def__init__(self,index:str,session_id:str,*,es_connection:Optional["Elasticsearch"]=None,es_url:Optional[str]=None,es_cloud_id:Optional[str]=None,es_user:Optional[str]=None,es_api_key:Optional[str]=None,es_password:Optional[str]=None,ensure_ascii:Optional[bool]=True,):self.index:str=indexself.session_id:str=session_idself.ensure_ascii=ensure_ascii# Initialize Elasticsearch client from passed client arg or connection infoifes_connectionisnotNone:self.client=es_connection.options(headers={"user-agent":self.get_user_agent()})elifes_urlisnotNoneores_cloud_idisnotNone:self.client=ElasticsearchChatMessageHistory.connect_to_elasticsearch(es_url=es_url,username=es_user,password=es_password,cloud_id=es_cloud_id,api_key=es_api_key,)else:raiseValueError("""Either provide a pre-existing Elasticsearch connection, \ or valid credentials for creating a new connection.""")ifself.client.indices.exists(index=index):logger.debug(f"Chat history index {index} already exists, skipping creation.")else:logger.debug(f"Creating index {index} for storing chat history.")self.client.indices.create(index=index,mappings={"properties":{"session_id":{"type":"keyword"},"created_at":{"type":"date"},"history":{"type":"text"},}},)
[docs]@staticmethoddefconnect_to_elasticsearch(*,es_url:Optional[str]=None,cloud_id:Optional[str]=None,api_key:Optional[str]=None,username:Optional[str]=None,password:Optional[str]=None,)->"Elasticsearch":try:importelasticsearchexceptImportError:raiseImportError("Could not import elasticsearch python package. ""Please install it with `pip install elasticsearch`.")ifes_urlandcloud_id:raiseValueError("Both es_url and cloud_id are defined. Please provide only one.")connection_params:Dict[str,Any]={}ifes_url:connection_params["hosts"]=[es_url]elifcloud_id:connection_params["cloud_id"]=cloud_idelse:raiseValueError("Please provide either elasticsearch_url or cloud_id.")ifapi_key:connection_params["api_key"]=api_keyelifusernameandpassword:connection_params["basic_auth"]=(username,password)es_client=elasticsearch.Elasticsearch(**connection_params,headers={"user-agent":ElasticsearchChatMessageHistory.get_user_agent()},)try:es_client.info()exceptExceptionaserr:logger.error(f"Error connecting to Elasticsearch: {err}")raiseerrreturnes_client
@propertydefmessages(self)->List[BaseMessage]:"""Retrieve the messages from Elasticsearch"""try:fromelasticsearchimportApiErrorresult=self.client.search(index=self.index,query={"term":{"session_id":self.session_id}},sort="created_at:asc",)exceptApiErroraserr:logger.error(f"Could not retrieve messages from Elasticsearch: {err}")raiseerrifresultandlen(result["hits"]["hits"])>0:items=[json.loads(document["_source"]["history"])fordocumentinresult["hits"]["hits"]]else:items=[]returnmessages_from_dict(items)@messages.setterdefmessages(self,messages:List[BaseMessage])->None:raiseNotImplementedError("Direct assignment to 'messages' is not allowed."" Use the 'add_messages' instead.")
[docs]defadd_message(self,message:BaseMessage)->None:"""Add a message to the chat session in Elasticsearch"""try:fromelasticsearchimportApiErrorself.client.index(index=self.index,document={"session_id":self.session_id,"created_at":round(time()*1000),"history":json.dumps(message_to_dict(message),ensure_ascii=bool(self.ensure_ascii),),},refresh=True,)exceptApiErroraserr:logger.error(f"Could not add message to Elasticsearch: {err}")raiseerr
[docs]defclear(self)->None:"""Clear session memory in Elasticsearch"""try:fromelasticsearchimportApiErrorself.client.delete_by_query(index=self.index,query={"term":{"session_id":self.session_id}},refresh=True,)exceptApiErroraserr:logger.error(f"Could not clear session memory in Elasticsearch: {err}")raiseerr