Source code for langchain_community.vectorstores.aperturedb
# System importsfrom__future__importannotationsimportloggingimporttimeimportuuidfromtypingimportAny,Dict,List,Optional,Sequence,Tuple,Type# Third-party importsimportnumpyasnp# Local importsfromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.indexing.baseimportUpsertResponsefromlangchain_core.vectorstoresimportVectorStorefromtyping_extensionsimportoverride# Configure some defaultsENGINE="HNSW"METRIC="CS"DESCRIPTOR_SET="langchain"BATCHSIZE=1000PROPERTY_PREFIX="lc_"# Prefix for properties that are in the client metadataTEXT_PROPERTY="text"# Property name for the textUNIQUEID_PROPERTY="uniqueid"# Property name for the unique id
[docs]@overridedef__init__(self,embeddings:Embeddings,descriptor_set:str=DESCRIPTOR_SET,dimensions:Optional[int]=None,engine:Optional[str]=None,metric:Optional[str]=None,log_level:int=logging.WARN,properties:Optional[Dict]=None,**kwargs:Any,)->None:"""Create a vectorstore backed by ApertureDB A single ApertureDB instance can support many vectorstores, distinguished by 'descriptor_set' name. The descriptor set is created if it does not exist. Different descriptor sets can use different engines and metrics, be supplied by different embedding models, and have different dimensions. See ApertureDB documentation on `AddDescriptorSet` https://docs.aperturedata.io/query_language/Reference/descriptor_commands/desc_set_commands/AddDescriptorSet for more information on the engine and metric options. Args: embeddings (Embeddings): Embeddings object descriptor_set (str, optional): Descriptor set name. Defaults to "langchain". dimensions (Optional[int], optional): Number of dimensions of the embeddings. Defaults to None. engine (str, optional): Engine to use. Defaults to "HNSW" for new descriptorsets. metric (str, optional): Metric to use. Defaults to "CS" for new descriptorsets. log_level (int, optional): Logging level. Defaults to logging.WARN. """# ApertureDB importstry:fromaperturedb.UtilsimportUtils,create_connectorexceptImportError:raiseImportError("ApertureDB is not installed. Please install it using ""'pip install aperturedb'")super().__init__(**kwargs)self.logger=logging.getLogger(__name__)self.logger.setLevel(log_level)self.descriptor_set=descriptor_setself.embedding_function=embeddingsself.dimensions=dimensionsself.engine=engineself.metric=metricself.properties=propertiesifembeddingsisNone:self.logger.fatal("No embedding function provided.")raiseValueError("No embedding function provided.")try:fromaperturedb.UtilsimportUtils,create_connectorexceptImportError:self.logger.exception("ApertureDB is not installed. Please install it using ""'pip install aperturedb'")raiseself.connection=create_connector()self.utils=Utils(self.connection)try:self.utils.status()exceptException:self.logger.exception("Failed to connect to ApertureDB")raiseself._find_or_add_descriptor_set()
def_find_or_add_descriptor_set(self)->None:descriptor_set=self.descriptor_set"""Checks if the descriptor set exists, if not, creates it"""find_ds_query=[{"FindDescriptorSet":{"with_name":descriptor_set,"engines":True,"metrics":True,"dimensions":True,"results":{"all_properties":True},}}]r,b=self.connection.query(find_ds_query)assertself.connection.last_query_ok(),rn_entities=(len(r[0]["FindDescriptorSet"]["entities"])if"entities"inr[0]["FindDescriptorSet"]else0)assertn_entities<=1,"Multiple descriptor sets with the same name"ifn_entities==1:# Descriptor set exists alreadye=r[0]["FindDescriptorSet"]["entities"][0]self.logger.info(f"Descriptor set {descriptor_set} already exists")engines=e["_engines"]assertlen(engines)==1,"Only one engine is supported"ifself.engineisNone:self.engine=engines[0]elifself.engine!=engines[0]:self.logger.error(f"Engine mismatch: {self.engine} != {engines[0]}")metrics=e["_metrics"]assertlen(metrics)==1,"Only one metric is supported"ifself.metricisNone:self.metric=metrics[0]elifself.metric!=metrics[0]:self.logger.error(f"Metric mismatch: {self.metric} != {metrics[0]}")dimensions=e["_dimensions"]ifself.dimensionsisNone:self.dimensions=dimensionselifself.dimensions!=dimensions:self.logger.error(f"Dimensions mismatch: {self.dimensions} != {dimensions}")self.properties={k[len(PROPERTY_PREFIX):]:vfork,vine.items()ifk.startswith(PROPERTY_PREFIX)}else:self.logger.info(f"Descriptor set {descriptor_set} does not exist. Creating it")ifself.engineisNone:self.engine=ENGINEifself.metricisNone:self.metric=METRICifself.dimensionsisNone:self.dimensions=len(self.embedding_function.embed_query("test"))properties=({PROPERTY_PREFIX+k:vfork,vinself.properties.items()}ifself.propertiesisnotNoneelseNone)self.utils.add_descriptorset(name=descriptor_set,dim=self.dimensions,engine=self.engine,metric=self.metric,properties=properties,)# Create indexesself.utils.create_entity_index("_Descriptor","_create_txn")self.utils.create_entity_index("_DescriptorSet","_name")self.utils.create_entity_index("_Descriptor",UNIQUEID_PROPERTY)
[docs]@overridedefdelete(self,ids:Optional[List[str]]=None,**kwargs:Any)->Optional[bool]:"""Delete documents from the vectorstore by id. Args: ids: List of ids to delete from the vectorstore. Returns: True if the deletion was successful, False otherwise """assertidsisnotNone,"ids must be provided"query=[{"DeleteDescriptor":{"set":self.descriptor_set,"constraints":{UNIQUEID_PROPERTY:["in",ids]},}}]result,_=self.utils.execute(query)returnresult
[docs]@overridedefget_by_ids(self,ids:Sequence[str],/)->List[Document]:"""Find documents in the vectorstore by id. Args: ids: List of ids to find in the vectorstore. Returns: documents: List of Document objects found in the vectorstore. """query=[{"FindDescriptor":{"set":self.descriptor_set,"constraints":{UNIQUEID_PROPERTY:["in",ids]},"results":{"all_properties":True},}}]results,_=self.utils.execute(query)docs=[self._descriptor_to_document(d)fordinresults[0]["FindDescriptor"].get("entities",[])]returndocs
[docs]@overridedefsimilarity_search(self,query:str,k:int=4,*args:Any,**kwargs:Any)->List[Document]:"""Search for documents similar to the query using the vectorstore Args: query: Query string to search for. k: Number of results to return. Returns: List of Document objects ordered by decreasing similarity to the query. """assertself.embedding_functionisnotNone,"Embedding function is not set"embedding=self.embedding_function.embed_query(query)returnself.similarity_search_by_vector(embedding,k,*args,**kwargs)
def_descriptor_to_document(self,d:dict)->Document:metadata={}fork,vind.items():ifk.startswith(PROPERTY_PREFIX):metadata[k[len(PROPERTY_PREFIX):]]=vtext=d[TEXT_PROPERTY]uniqueid=d[UNIQUEID_PROPERTY]doc=Document(page_content=text,metadata=metadata,id=uniqueid)returndocdef_similarity_search_with_score_by_vector(self,embedding:List[float],k:int=4,vectors:bool=False)->List[Tuple[Document,float]]:fromaperturedb.DescriptorsimportDescriptorsdescriptors=Descriptors(self.connection)start_time=time.time()descriptors.find_similar(set=self.descriptor_set,vector=embedding,k_neighbors=k,distances=True)self.logger.info(f"ApertureDB similarity search took {time.time()-start_time} seconds")return[(self._descriptor_to_document(d),d["_distance"])fordindescriptors]
[docs]@overridedefsimilarity_search_by_vector(self,embedding:List[float],k:int=4,**kwargs:Any)->List[Document]:"""Returns the k most similar documents to the given embedding vector Args: embedding: The embedding vector to search for k: The number of similar documents to return Returns: List of Document objects ordered by decreasing similarity to the query. """fromaperturedb.DescriptorsimportDescriptorsdescriptors=Descriptors(self.connection)start_time=time.time()descriptors.find_similar(set=self.descriptor_set,vector=embedding,k_neighbors=k)self.logger.info(f"ApertureDB similarity search took {time.time()-start_time} seconds")return[self._descriptor_to_document(d)fordindescriptors]
[docs]@overridedefmax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,**kwargs:Any,)->List[Document]:"""Returns similar documents to the query that also have diversity This algorithm balances relevance and diversity in the search results. Args: query: Query string to search for. k: Number of results to return. fetch_k: Number of results to fetch. lambda_mult: Lambda multiplier for MMR. Returns: List of Document objects ordered by decreasing similarity/diversty. """self.logger.info(f"Max Marginal Relevance search for query: {query}")embedding=self.embedding_function.embed_query(query)returnself.max_marginal_relevance_search_by_vector(embedding,k,fetch_k,lambda_mult,**kwargs)
[docs]@overridedefmax_marginal_relevance_search_by_vector(self,embedding:List[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,**kwargs:Any,)->List[Document]:"""Returns similar documents to the vector that also have diversity This algorithm balances relevance and diversity in the search results. Args: embedding: Embedding vector to search for. k: Number of results to return. fetch_k: Number of results to fetch. lambda_mult: Lambda multiplier for MMR. Returns: List of Document objects ordered by decreasing similarity/diversty. """fromaperturedb.DescriptorsimportDescriptorsdescriptors=Descriptors(self.connection)start_time=time.time()descriptors.find_similar_mmr(set=self.descriptor_set,vector=embedding,k_neighbors=k,fetch_k=fetch_k,lambda_mult=lambda_mult,)self.logger.info(f"ApertureDB similarity search mmr took {time.time()-start_time} seconds")return[self._descriptor_to_document(d)fordindescriptors]
[docs]@classmethod@overridedeffrom_texts(cls:Type[ApertureDB],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,**kwargs:Any,)->ApertureDB:"""Creates a new vectorstore from a list of texts Args: texts: List of text strings embedding: Embeddings object as for constructing the vectorstore metadatas: Optional list of metadatas associated with the texts. kwargs: Additional arguments to pass to the constructor """store=cls(embeddings=embedding,**kwargs)store.add_texts(texts,metadatas)returnstore
[docs]@classmethod@overridedeffrom_documents(cls:Type[ApertureDB],documents:List[Document],embedding:Embeddings,**kwargs:Any,)->ApertureDB:"""Creates a new vectorstore from a list of documents Args: documents: List of Document objects embedding: Embeddings object as for constructing the vectorstore metadatas: Optional list of metadatas associated with the texts. kwargs: Additional arguments to pass to the constructor """store=cls(embeddings=embedding,**kwargs)store.add_documents(documents)returnstore
[docs]@classmethoddefdelete_vectorstore(class_,descriptor_set:str)->None:"""Deletes a vectorstore and all its data from the database Args: descriptor_set: The name of the descriptor set to delete """fromaperturedb.UtilsimportUtils,create_connectordb=create_connector()utils=Utils(db)utils.remove_descriptorset(descriptor_set)
[docs]@classmethoddeflist_vectorstores(class_)->None:"""Returns a list of all vectorstores in the database Returns: List of descriptor sets with properties """fromaperturedb.Utilsimportcreate_connectordb=create_connector()query=[{"FindDescriptorSet":{# Return all properties"results":{"all_properties":True},"engines":True,"metrics":True,"dimensions":True,}}]response,_=db.query(query)assertdb.last_query_ok(),responsereturnresponse[0]["FindDescriptorSet"]["entities"]
[docs]defadd_documents(self,documents:List[Document],**kwargs:Any)->List[str]:"""Add or update documents in the vectorstore. Args: documents: Documents to add to the vectorstore. kwargs: Additional keyword arguments. if kwargs contains ids and documents contain ids, the ids in the kwargs will receive precedence. Returns: List of IDs of the added texts. Raises: ValueError: If the number of ids does not match the number of documents. """if"ids"inkwargs:ids=kwargs.pop("ids")ifidsandlen(ids)!=len(documents):raiseValueError("The number of ids must match the number of documents. ""Got {len(ids)} ids and {len(documents)} documents.")documents_=[]forid_,documentinzip(ids,documents):doc_with_id=Document(page_content=document.page_content,metadata=document.metadata,id=id_,)documents_.append(doc_with_id)else:documents_=documents# If upsert has been implemented, we can use it to add documentsreturnself.upsert(documents_,**kwargs)["succeeded"]
[docs]defupsert(self,items:Sequence[Document],/,**kwargs:Any)->UpsertResponse:"""Insert or update items Updating documents is dependent on the documents' `id` attribute. Args: items: List of Document objects to upsert Returns: UpsertResponse object with succeeded and failed """# For now, simply delete and add# We could do something more efficient to update metadata,# but we don't support changing the embedding of a descriptor.fromaperturedb.ParallelLoaderimportParallelLoaderids_to_delete:List[str]=[item.idforiteminitemsifhasattr(item,"id")anditem.idisnotNone]ifids_to_delete:self.delete(ids_to_delete)texts=[doc.page_contentfordocinitems]metadatas=[doc.metadataifgetattr(doc,"metadata",None)isnotNoneelse{}fordocinitems]embeddings=self.embedding_function.embed_documents(texts)ids:List[str]=[doc.idifhasattr(doc,"id")anddoc.idisnotNoneelsestr(uuid.uuid4())fordocinitems]data=[]fortext,embedding,metadata,unique_idinzip(texts,embeddings,metadatas,ids):properties={PROPERTY_PREFIX+k:vfork,vinmetadata.items()}properties[TEXT_PROPERTY]=textproperties[UNIQUEID_PROPERTY]=unique_idcommand={"AddDescriptor":{"set":self.descriptor_set,"properties":properties,}}query=[command]blobs=[np.array(embedding,dtype=np.float32).tobytes()]data.append((query,blobs))loader=ParallelLoader(self.connection)loader.ingest(data,batchsize=BATCHSIZE)returnUpsertResponse(succeeded=ids,failed=[])