Source code for langchain.retrievers.time_weighted_retriever
importdatetimefromcopyimportdeepcopyfromtypingimportAny,Dict,List,Optional,Tuplefromlangchain_core.callbacksimport(AsyncCallbackManagerForRetrieverRun,CallbackManagerForRetrieverRun,)fromlangchain_core.documentsimportDocumentfromlangchain_core.retrieversimportBaseRetrieverfromlangchain_core.vectorstoresimportVectorStorefrompydanticimportConfigDict,Fielddef_get_hours_passed(time:datetime.datetime,ref_time:datetime.datetime)->float:"""Get the hours passed between two datetimes."""return(time-ref_time).total_seconds()/3600
[docs]classTimeWeightedVectorStoreRetriever(BaseRetriever):"""Retriever that combines embedding similarity with recency in retrieving values."""vectorstore:VectorStore"""The vectorstore to store documents and determine salience."""search_kwargs:dict=Field(default_factory=lambda:dict(k=100))"""Keyword arguments to pass to the vectorstore similarity search."""# TODO: abstract as a queuememory_stream:List[Document]=Field(default_factory=list)"""The memory_stream of documents to search through."""decay_rate:float=Field(default=0.01)"""The exponential decay factor used as (1.0-decay_rate)**(hrs_passed)."""k:int=4"""The maximum number of documents to retrieve in a given call."""other_score_keys:List[str]=[]"""Other keys in the metadata to factor into the score, e.g. 'importance'."""default_salience:Optional[float]=None"""The salience to assign memories not retrieved from the vector store. None assigns no salience to documents not fetched from the vector store. """model_config=ConfigDict(arbitrary_types_allowed=True,)def_document_get_date(self,field:str,document:Document)->datetime.datetime:"""Return the value of the date field of a document."""iffieldindocument.metadata:ifisinstance(document.metadata[field],float):returndatetime.datetime.fromtimestamp(document.metadata[field])returndocument.metadata[field]returndatetime.datetime.now()def_get_combined_score(self,document:Document,vector_relevance:Optional[float],current_time:datetime.datetime,)->float:"""Return the combined score for a document."""hours_passed=_get_hours_passed(current_time,self._document_get_date("last_accessed_at",document),)score=(1.0-self.decay_rate)**hours_passedforkeyinself.other_score_keys:ifkeyindocument.metadata:score+=document.metadata[key]ifvector_relevanceisnotNone:score+=vector_relevancereturnscore
[docs]defget_salient_docs(self,query:str)->Dict[int,Tuple[Document,float]]:"""Return documents that are salient to the query."""docs_and_scores:List[Tuple[Document,float]]docs_and_scores=self.vectorstore.similarity_search_with_relevance_scores(query,**self.search_kwargs)results={}forfetched_doc,relevanceindocs_and_scores:if"buffer_idx"infetched_doc.metadata:buffer_idx=fetched_doc.metadata["buffer_idx"]doc=self.memory_stream[buffer_idx]results[buffer_idx]=(doc,relevance)returnresults
[docs]asyncdefaget_salient_docs(self,query:str)->Dict[int,Tuple[Document,float]]:"""Return documents that are salient to the query."""docs_and_scores:List[Tuple[Document,float]]docs_and_scores=(awaitself.vectorstore.asimilarity_search_with_relevance_scores(query,**self.search_kwargs))results={}forfetched_doc,relevanceindocs_and_scores:if"buffer_idx"infetched_doc.metadata:buffer_idx=fetched_doc.metadata["buffer_idx"]doc=self.memory_stream[buffer_idx]results[buffer_idx]=(doc,relevance)returnresults
def_get_rescored_docs(self,docs_and_scores:Dict[Any,Tuple[Document,Optional[float]]])->List[Document]:current_time=datetime.datetime.now()rescored_docs=[(doc,self._get_combined_score(doc,relevance,current_time))fordoc,relevanceindocs_and_scores.values()]rescored_docs.sort(key=lambdax:x[1],reverse=True)result=[]# Ensure frequently accessed memories aren't forgottenfordoc,_inrescored_docs[:self.k]:# TODO: Update vector store doc once `update` method is exposed.buffered_doc=self.memory_stream[doc.metadata["buffer_idx"]]buffered_doc.metadata["last_accessed_at"]=current_timeresult.append(buffered_doc)returnresultdef_get_relevant_documents(self,query:str,*,run_manager:CallbackManagerForRetrieverRun)->List[Document]:docs_and_scores={doc.metadata["buffer_idx"]:(doc,self.default_salience)fordocinself.memory_stream[-self.k:]}# If a doc is considered salient, update the salience scoredocs_and_scores.update(self.get_salient_docs(query))returnself._get_rescored_docs(docs_and_scores)asyncdef_aget_relevant_documents(self,query:str,*,run_manager:AsyncCallbackManagerForRetrieverRun)->List[Document]:docs_and_scores={doc.metadata["buffer_idx"]:(doc,self.default_salience)fordocinself.memory_stream[-self.k:]}# If a doc is considered salient, update the salience scoredocs_and_scores.update(awaitself.aget_salient_docs(query))returnself._get_rescored_docs(docs_and_scores)
[docs]defadd_documents(self,documents:List[Document],**kwargs:Any)->List[str]:"""Add documents to vectorstore."""current_time=kwargs.get("current_time")ifcurrent_timeisNone:current_time=datetime.datetime.now()# Avoid mutating input documentsdup_docs=[deepcopy(d)fordindocuments]fori,docinenumerate(dup_docs):if"last_accessed_at"notindoc.metadata:doc.metadata["last_accessed_at"]=current_timeif"created_at"notindoc.metadata:doc.metadata["created_at"]=current_timedoc.metadata["buffer_idx"]=len(self.memory_stream)+iself.memory_stream.extend(dup_docs)returnself.vectorstore.add_documents(dup_docs,**kwargs)
[docs]asyncdefaadd_documents(self,documents:List[Document],**kwargs:Any)->List[str]:"""Add documents to vectorstore."""current_time=kwargs.get("current_time")ifcurrent_timeisNone:current_time=datetime.datetime.now()# Avoid mutating input documentsdup_docs=[deepcopy(d)fordindocuments]fori,docinenumerate(dup_docs):if"last_accessed_at"notindoc.metadata:doc.metadata["last_accessed_at"]=current_timeif"created_at"notindoc.metadata:doc.metadata["created_at"]=current_timedoc.metadata["buffer_idx"]=len(self.memory_stream)+iself.memory_stream.extend(dup_docs)returnawaitself.vectorstore.aadd_documents(dup_docs,**kwargs)