[docs]@beta(message="Introduced in version 0.2.29. Underlying abstraction subject to change.")classInMemoryDocumentIndex(DocumentIndex):"""In memory document index. This is an in-memory document index that stores documents in a dictionary. It provides a simple search API that returns documents by the number of counts the given query appears in the document. .. versionadded:: 0.2.29 """store:Dict[str,Document]=Field(default_factory=dict)top_k:int=4
[docs]defupsert(self,items:Sequence[Document],/,**kwargs:Any)->UpsertResponse:"""Upsert items into the index."""ok_ids=[]foriteminitems:ifitem.idisNone:id_=str(uuid.uuid4())item_=item.copy()item_.id=id_else:item_=itemid_=item.idself.store[id_]=item_ok_ids.append(cast(str,item_.id))returnUpsertResponse(succeeded=ok_ids,failed=[])
[docs]defdelete(self,ids:Optional[List[str]]=None,**kwargs:Any)->DeleteResponse:"""Delete by ID."""ifidsisNone:raiseValueError("IDs must be provided for deletion")ok_ids=[]forid_inids:ifid_inself.store:delself.store[id_]ok_ids.append(id_)returnDeleteResponse(succeeded=ok_ids,num_deleted=len(ok_ids),num_failed=0,failed=[])
[docs]defget(self,ids:Sequence[str],/,**kwargs:Any)->List[Document]:"""Get by ids."""found_documents=[]forid_inids:ifid_inself.store:found_documents.append(self.store[id_])returnfound_documents