"""Deprecated as of LangChain v0.3.4 and will be removed in LangChain v1.0.0."""importloggingfromabcimportABC,abstractmethodfromcollections.abcimportIterablefromitertoolsimportislicefromtypingimportTYPE_CHECKING,Any,Optionalfromlangchain_core._apiimportdeprecatedfromlangchain_core.language_modelsimportBaseLanguageModelfromlangchain_core.messagesimportBaseMessage,get_buffer_stringfromlangchain_core.promptsimportBasePromptTemplatefrompydanticimportBaseModel,ConfigDict,Fieldfromlangchain.chains.llmimportLLMChainfromlangchain.memory.chat_memoryimportBaseChatMemoryfromlangchain.memory.promptimport(ENTITY_EXTRACTION_PROMPT,ENTITY_SUMMARIZATION_PROMPT,)fromlangchain.memory.utilsimportget_prompt_input_keyifTYPE_CHECKING:importsqlite3logger=logging.getLogger(__name__)
[docs]@deprecated(since="0.3.1",removal="1.0.0",message=("Please see the migration guide at: ""https://python.langchain.com/docs/versions/migrating_memory/"),)classBaseEntityStore(BaseModel,ABC):"""Abstract base class for Entity store."""
[docs]@abstractmethoddefget(self,key:str,default:Optional[str]=None)->Optional[str]:"""Get entity value from store."""
[docs]@abstractmethoddefset(self,key:str,value:Optional[str])->None:"""Set entity value in store."""
[docs]@abstractmethoddefdelete(self,key:str)->None:"""Delete entity value from store."""
[docs]@abstractmethoddefexists(self,key:str)->bool:"""Check if entity exists in store."""
[docs]@abstractmethoddefclear(self)->None:"""Delete all entities from store."""
[docs]@deprecated(since="0.3.1",removal="1.0.0",message=("Please see the migration guide at: ""https://python.langchain.com/docs/versions/migrating_memory/"),)classInMemoryEntityStore(BaseEntityStore):"""In-memory Entity store."""store:dict[str,Optional[str]]={}
[docs]@deprecated(since="0.3.1",removal="1.0.0",message=("Please see the migration guide at: ""https://python.langchain.com/docs/versions/migrating_memory/"),)classUpstashRedisEntityStore(BaseEntityStore):"""Upstash Redis backed Entity store. Entities get a TTL of 1 day by default, and that TTL is extended by 3 days every time the entity is read back. """def__init__(self,session_id:str="default",url:str="",token:str="",key_prefix:str="memory_store",ttl:Optional[int]=60*60*24,recall_ttl:Optional[int]=60*60*24*3,*args:Any,**kwargs:Any,):try:fromupstash_redisimportRedisexceptImportErrorase:msg=("Could not import upstash_redis python package. ""Please install it with `pip install upstash_redis`.")raiseImportError(msg)fromesuper().__init__(*args,**kwargs)try:self.redis_client=Redis(url=url,token=token)exceptExceptionasexc:error_msg="Upstash Redis instance could not be initiated"logger.exception(error_msg)raiseRuntimeError(error_msg)fromexcself.session_id=session_idself.key_prefix=key_prefixself.ttl=ttlself.recall_ttl=recall_ttlorttl@propertydeffull_key_prefix(self)->str:returnf"{self.key_prefix}:{self.session_id}"
[docs]defget(self,key:str,default:Optional[str]=None)->Optional[str]:res=(self.redis_client.getex(f"{self.full_key_prefix}:{key}",ex=self.recall_ttl)ordefaultor"")logger.debug("Upstash Redis MEM get '%s:%s': '%s'",self.full_key_prefix,key,res)returnres
[docs]defset(self,key:str,value:Optional[str])->None:ifnotvalue:returnself.delete(key)self.redis_client.set(f"{self.full_key_prefix}:{key}",value,ex=self.ttl)logger.debug("Redis MEM set '%s:%s': '%s' EX %s",self.full_key_prefix,key,value,self.ttl,)returnNone
[docs]@deprecated(since="0.3.1",removal="1.0.0",message=("Please see the migration guide at: ""https://python.langchain.com/docs/versions/migrating_memory/"),)classRedisEntityStore(BaseEntityStore):"""Redis-backed Entity store. Entities get a TTL of 1 day by default, and that TTL is extended by 3 days every time the entity is read back. """redis_client:Anysession_id:str="default"key_prefix:str="memory_store"ttl:Optional[int]=60*60*24recall_ttl:Optional[int]=60*60*24*3def__init__(self,session_id:str="default",url:str="redis://localhost:6379/0",key_prefix:str="memory_store",ttl:Optional[int]=60*60*24,recall_ttl:Optional[int]=60*60*24*3,*args:Any,**kwargs:Any,):try:importredisexceptImportErrorase:msg=("Could not import redis python package. ""Please install it with `pip install redis`.")raiseImportError(msg)fromesuper().__init__(*args,**kwargs)try:fromlangchain_community.utilities.redisimportget_clientexceptImportErrorase:msg=("Could not import langchain_community.utilities.redis.get_client. ""Please install it with `pip install langchain-community`.")raiseImportError(msg)frometry:self.redis_client=get_client(redis_url=url,decode_responses=True)exceptredis.exceptions.ConnectionError:logger.exception("Redis client could not connect")self.session_id=session_idself.key_prefix=key_prefixself.ttl=ttlself.recall_ttl=recall_ttlorttl@propertydeffull_key_prefix(self)->str:returnf"{self.key_prefix}:{self.session_id}"
[docs]defget(self,key:str,default:Optional[str]=None)->Optional[str]:res=(self.redis_client.getex(f"{self.full_key_prefix}:{key}",ex=self.recall_ttl)ordefaultor"")logger.debug("REDIS MEM get '%s:%s': '%s'",self.full_key_prefix,key,res)returnres
[docs]defset(self,key:str,value:Optional[str])->None:ifnotvalue:returnself.delete(key)self.redis_client.set(f"{self.full_key_prefix}:{key}",value,ex=self.ttl)logger.debug("REDIS MEM set '%s:%s': '%s' EX %s",self.full_key_prefix,key,value,self.ttl,)returnNone
[docs]defclear(self)->None:# iterate a list in batches of size batch_sizedefbatched(iterable:Iterable[Any],batch_size:int)->Iterable[Any]:iterator=iter(iterable)whilebatch:=list(islice(iterator,batch_size)):yieldbatchforkeybatchinbatched(self.redis_client.scan_iter(f"{self.full_key_prefix}:*"),500,):self.redis_client.delete(*keybatch)
[docs]@deprecated(since="0.3.1",removal="1.0.0",message=("Please see the migration guide at: ""https://python.langchain.com/docs/versions/migrating_memory/"),)classSQLiteEntityStore(BaseEntityStore):"""SQLite-backed Entity store with safe query construction."""session_id:str="default"table_name:str="memory_store"conn:Any=Nonemodel_config=ConfigDict(arbitrary_types_allowed=True,)def__init__(self,session_id:str="default",db_file:str="entities.db",table_name:str="memory_store",*args:Any,**kwargs:Any,):super().__init__(*args,**kwargs)try:importsqlite3exceptImportErrorase:msg=("Could not import sqlite3 python package. ""Please install it with `pip install sqlite3`.")raiseImportError(msg)frome# Basic validation to prevent obviously malicious table/session namesifnottable_name.isidentifier()ornotsession_id.isidentifier():# Since we validate here, we can safely suppress the S608 bandit warningmsg="Table name and session ID must be valid Python identifiers."raiseValueError(msg)self.conn=sqlite3.connect(db_file)self.session_id=session_idself.table_name=table_nameself._create_table_if_not_exists()@propertydeffull_table_name(self)->str:returnf"{self.table_name}_{self.session_id}"def_execute_query(self,query:str,params:tuple=())->"sqlite3.Cursor":"""Executes a query with proper connection handling."""withself.conn:returnself.conn.execute(query,params)def_create_table_if_not_exists(self)->None:"""Creates the entity table if it doesn't exist, using safe quoting."""# Use standard SQL double quotes for the table name identifiercreate_table_query=f""" CREATE TABLE IF NOT EXISTS "{self.full_table_name}" ( key TEXT PRIMARY KEY, value TEXT ) """self._execute_query(create_table_query)
[docs]defget(self,key:str,default:Optional[str]=None)->Optional[str]:"""Retrieves a value, safely quoting the table name."""# `?` placeholder is used for the value to prevent SQL injection# Ignore S608 since we validate for malicious table/session names in `__init__`query=f'SELECT value FROM "{self.full_table_name}" WHERE key = ?'# noqa: S608cursor=self._execute_query(query,(key,))result=cursor.fetchone()returnresult[0]ifresultisnotNoneelsedefault
[docs]defset(self,key:str,value:Optional[str])->None:"""Inserts or replaces a value, safely quoting the table name."""ifnotvalue:returnself.delete(key)# Ignore S608 since we validate for malicious table/session names in `__init__`query=("INSERT OR REPLACE INTO "# noqa: S608f'"{self.full_table_name}" (key, value) VALUES (?, ?)')self._execute_query(query,(key,value))returnNone
[docs]defdelete(self,key:str)->None:"""Deletes a key-value pair, safely quoting the table name."""# Ignore S608 since we validate for malicious table/session names in `__init__`query=f'DELETE FROM "{self.full_table_name}" WHERE key = ?'# noqa: S608self._execute_query(query,(key,))
[docs]defexists(self,key:str)->bool:"""Checks for the existence of a key, safely quoting the table name."""# Ignore S608 since we validate for malicious table/session names in `__init__`query=f'SELECT 1 FROM "{self.full_table_name}" WHERE key = ? LIMIT 1'# noqa: S608cursor=self._execute_query(query,(key,))returncursor.fetchone()isnotNone
[docs]defclear(self)->None:# Ignore S608 since we validate for malicious table/session names in `__init__`query=f""" DELETE FROM {self.full_table_name} """# noqa: S608withself.conn:self.conn.execute(query)
[docs]@deprecated(since="0.3.1",removal="1.0.0",message=("Please see the migration guide at: ""https://python.langchain.com/docs/versions/migrating_memory/"),)classConversationEntityMemory(BaseChatMemory):"""Entity extractor & summarizer memory. Extracts named entities from the recent chat history and generates summaries. With a swappable entity store, persisting entities across conversations. Defaults to an in-memory entity store, and can be swapped out for a Redis, SQLite, or other entity store. """human_prefix:str="Human"ai_prefix:str="AI"llm:BaseLanguageModelentity_extraction_prompt:BasePromptTemplate=ENTITY_EXTRACTION_PROMPTentity_summarization_prompt:BasePromptTemplate=ENTITY_SUMMARIZATION_PROMPT# Cache of recently detected entity names, if any# It is updated when load_memory_variables is called:entity_cache:list[str]=[]# Number of recent message pairs to consider when updating entities:k:int=3chat_history_key:str="history"# Store to manage entity-related data:entity_store:BaseEntityStore=Field(default_factory=InMemoryEntityStore)@propertydefbuffer(self)->list[BaseMessage]:"""Access chat memory messages."""returnself.chat_memory.messages@propertydefmemory_variables(self)->list[str]:"""Will always return list of memory variables. :meta private: """return["entities",self.chat_history_key]
[docs]defload_memory_variables(self,inputs:dict[str,Any])->dict[str,Any]:""" Returns chat history and all generated entities with summaries if available, and updates or clears the recent entity cache. New entity name can be found when calling this method, before the entity summaries are generated, so the entity cache values may be empty if no entity descriptions are generated yet. """# Create an LLMChain for predicting entity names from the recent chat history:chain=LLMChain(llm=self.llm,prompt=self.entity_extraction_prompt)ifself.input_keyisNone:prompt_input_key=get_prompt_input_key(inputs,self.memory_variables)else:prompt_input_key=self.input_key# Extract an arbitrary window of the last message pairs from# the chat history, where the hyperparameter k is the# number of message pairs:buffer_string=get_buffer_string(self.buffer[-self.k*2:],human_prefix=self.human_prefix,ai_prefix=self.ai_prefix,)# Generates a comma-separated list of named entities,# e.g. "Jane, White House, UFO"# or "NONE" if no named entities are extracted:output=chain.predict(history=buffer_string,input=inputs[prompt_input_key],)# If no named entities are extracted, assigns an empty list.ifoutput.strip()=="NONE":entities=[]else:# Make a list of the extracted entities:entities=[w.strip()forwinoutput.split(",")]# Make a dictionary of entities with summary if exists:entity_summaries={}forentityinentities:entity_summaries[entity]=self.entity_store.get(entity,"")# Replaces the entity name cache with the most recently discussed entities,# or if no entities were extracted, clears the cache:self.entity_cache=entities# Should we return as message objects or as a string?ifself.return_messages:# Get last `k` pair of chat messages:buffer:Any=self.buffer[-self.k*2:]else:# Reuse the string we made earlier:buffer=buffer_stringreturn{self.chat_history_key:buffer,"entities":entity_summaries,}
[docs]defsave_context(self,inputs:dict[str,Any],outputs:dict[str,str])->None:""" Save context from this conversation history to the entity store. Generates a summary for each entity in the entity cache by prompting the model, and saves these summaries to the entity store. """super().save_context(inputs,outputs)ifself.input_keyisNone:prompt_input_key=get_prompt_input_key(inputs,self.memory_variables)else:prompt_input_key=self.input_key# Extract an arbitrary window of the last message pairs from# the chat history, where the hyperparameter k is the# number of message pairs:buffer_string=get_buffer_string(self.buffer[-self.k*2:],human_prefix=self.human_prefix,ai_prefix=self.ai_prefix,)input_data=inputs[prompt_input_key]# Create an LLMChain for predicting entity summarization from the contextchain=LLMChain(llm=self.llm,prompt=self.entity_summarization_prompt)# Generate new summaries for entities and save them in the entity storeforentityinself.entity_cache:# Get existing summary if it existsexisting_summary=self.entity_store.get(entity,"")output=chain.predict(summary=existing_summary,entity=entity,history=buffer_string,input=input_data,)# Save the updated summary to the entity storeself.entity_store.set(entity,output.strip())