[docs]classBESVectorStore(VectorStore):"""`Baidu Elasticsearch` vector store. Example: .. code-block:: python from langchain_community.vectorstores import BESVectorStore from langchain_community.embeddings.openai import OpenAIEmbeddings embeddings = OpenAIEmbeddings() vectorstore = BESVectorStore( embedding=OpenAIEmbeddings(), index_name="langchain-demo", bes_url="http://localhost:9200" ) Args: index_name: Name of the Elasticsearch index to create. bes_url: URL of the Baidu Elasticsearch instance to connect to. user: Username to use when connecting to Elasticsearch. password: Password to use when connecting to Elasticsearch. More information can be obtained from: https://cloud.baidu.com/doc/BES/s/8llyn0hh4 """
[docs]def__init__(self,index_name:str,bes_url:str,user:Optional[str]=None,password:Optional[str]=None,embedding:Optional[Embeddings]=None,**kwargs:Optional[dict],)->None:self.embedding=embeddingself.index_name=index_nameself.query_field=kwargs.get("query_field","text")self.vector_query_field=kwargs.get("vector_query_field","vector")self.space_type=kwargs.get("space_type","cosine")self.index_type=kwargs.get("index_type","linear")self.index_params=kwargs.get("index_params")or{}ifbes_urlisnotNone:self.client=BESVectorStore.bes_client(bes_url=bes_url,username=user,password=password)else:raiseValueError("""Please specified a bes connection url.""")
[docs]@staticmethoddefbes_client(*,bes_url:Optional[str]=None,username:Optional[str]=None,password:Optional[str]=None,)->"Elasticsearch":try:importelasticsearchexceptImportError:raiseImportError("Could not import elasticsearch python package. ""Please install it with `pip install elasticsearch`.")connection_params:Dict[str,Any]={}connection_params["hosts"]=[bes_url]ifusernameandpassword:connection_params["basic_auth"]=(username,password)es_client=elasticsearch.Elasticsearch(**connection_params)try:es_client.info()exceptExceptionase:logger.error(f"Error connecting to Elasticsearch: {e}")raiseereturnes_client
def_create_index_if_not_exists(self,dims_length:Optional[int]=None)->None:"""Create the index if it doesn't already exist. Args: dims_length: Length of the embedding vectors. """ifself.client.indices.exists(index=self.index_name):logger.info(f"Index {self.index_name} already exists. Skipping creation.")else:ifdims_lengthisNone:raiseValueError("Cannot create index without specifying dims_length "+"when the index doesn't already exist. ")indexMapping=self._index_mapping(dims_length=dims_length)logger.debug(f"Creating index {self.index_name} with mappings {indexMapping}")self.client.indices.create(index=self.index_name,body={"settings":{"index":{"knn":True}},"mappings":{"properties":indexMapping},},)def_index_mapping(self,dims_length:Union[int,None])->Dict:""" Executes when the index is created. Args: dims_length: Numeric length of the embedding vectors, or None if not using vector-based query. index_params: The extra pamameters for creating index. Returns: Dict: The Elasticsearch settings and mappings for the strategy. """if"linear"==self.index_type:return{self.vector_query_field:{"type":"bpack_vector","dims":dims_length,"build_index":self.index_params.get("build_index",False),}}elif"hnsw"==self.index_type:return{self.vector_query_field:{"type":"bpack_vector","dims":dims_length,"index_type":"hnsw","space_type":self.space_type,"parameters":{"ef_construction":self.index_params.get("hnsw_ef_construction",200),"m":self.index_params.get("hnsw_m",4),},}}else:return{self.vector_query_field:{"type":"bpack_vector","model_id":self.index_params.get("model_id",""),}}
[docs]defdelete(self,ids:Optional[List[str]]=None,**kwargs:Any,)->Optional[bool]:"""Delete documents from the index. Args: ids: List of ids of documents to delete """try:fromelasticsearch.helpersimportBulkIndexError,bulkexceptImportError:raiseImportError("Could not import elasticsearch python package. ""Please install it with `pip install elasticsearch`.")body=[]ifidsisNone:raiseValueError("ids must be provided.")for_idinids:body.append({"_op_type":"delete","_index":self.index_name,"_id":_id})iflen(body)>0:try:bulk(self.client,body,refresh=kwargs.get("refresh_indices",True),ignore_status=404,)logger.debug(f"Deleted {len(body)} texts from index")returnTrueexceptBulkIndexErrorase:logger.error(f"Error deleting texts: {e}")raiseeelse:logger.info("No documents to delete")returnFalse
def_query_body(self,query_vector:Union[List[float],None],filter:Optional[dict]=None,search_params:Dict={},)->Dict:query_vector_body={"vector":query_vector,"k":search_params.get("k",2)}iffilterisnotNoneandlen(filter)!=0:query_vector_body["filter"]=filterif"linear"==self.index_type:query_vector_body["linear"]=Trueelse:query_vector_body["ef"]=search_params.get("ef",10)return{"size":search_params.get("size",4),"query":{"knn":{self.vector_query_field:query_vector_body}},}def_search(self,query:Optional[str]=None,query_vector:Union[List[float],None]=None,filter:Optional[dict]=None,custom_query:Optional[Callable[[Dict,Union[str,None]],Dict]]=None,search_params:Dict={},)->List[Tuple[Document,float]]:"""Return searched documents result from BES Args: query: Text to look up documents similar to. query_vector: Embedding to look up documents similar to. filter: Array of Baidu ElasticSearch filter clauses to apply to the query. custom_query: Function to modify the query body before it is sent to BES. Returns: List of Documents most similar to the query and score for each """ifself.embeddingandqueryisnotNone:query_vector=self.embedding.embed_query(query)query_body=self._query_body(query_vector=query_vector,filter=filter,search_params=search_params)ifcustom_queryisnotNone:query_body=custom_query(query_body,query)logger.debug(f"Calling custom_query, Query body now: {query_body}")logger.debug(f"Query body: {query_body}")# Perform the kNN search on the BES index and return the results.response=self.client.search(index=self.index_name,body=query_body)logger.debug(f"response={response}")hits=[hitforhitinresponse["hits"]["hits"]]docs_and_scores=[(Document(page_content=hit["_source"][self.query_field],metadata=hit["_source"]["metadata"],),hit["_score"],)forhitinhits]returndocs_and_scores
[docs]defsimilarity_search(self,query:str,k:int=4,filter:Optional[dict]=None,**kwargs:Any,)->List[Document]:"""Return documents most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Array of Elasticsearch filter clauses to apply to the query. Returns: List of Documents most similar to the query, in descending order of similarity. """results=self.similarity_search_with_score(query=query,k=k,filter=filter,**kwargs)return[docfordoc,_inresults]
[docs]defsimilarity_search_with_score(self,query:str,k:int,filter:Optional[dict]=None,**kwargs:Any)->List[Tuple[Document,float]]:"""Return documents most similar to query, along with scores. Args: query: Text to look up documents similar to. size: Number of Documents to return. Defaults to 4. filter: Array of Elasticsearch filter clauses to apply to the query. Returns: List of Documents most similar to the query and score for each """search_params=kwargs.get("search_params")or{}iflen(search_params)==0orsearch_params.get("size")isNone:search_params["size"]=kreturnself._search(query=query,filter=filter,**kwargs)
[docs]@classmethoddeffrom_documents(cls,documents:List[Document],embedding:Optional[Embeddings]=None,**kwargs:Any,)->"BESVectorStore":"""Construct BESVectorStore wrapper from documents. Args: documents: List of documents to add to the Elasticsearch index. embedding: Embedding function to use to embed the texts. Do not provide if using a strategy that doesn't require inference. kwargs: create index key words arguments """vectorStore=BESVectorStore._bes_vector_store(embedding=embedding,**kwargs)# Encode the provided texts and add them to the newly created index.vectorStore.add_documents(documents)returnvectorStore
[docs]@classmethoddeffrom_texts(cls,texts:List[str],embedding:Optional[Embeddings]=None,metadatas:Optional[List[Dict[str,Any]]]=None,**kwargs:Any,)->"BESVectorStore":"""Construct BESVectorStore wrapper from raw documents. Args: texts: List of texts to add to the Elasticsearch index. embedding: Embedding function to use to embed the texts. metadatas: Optional list of metadatas associated with the texts. index_name: Name of the Elasticsearch index to create. kwargs: create index key words arguments """vectorStore=BESVectorStore._bes_vector_store(embedding=embedding,**kwargs)# Encode the provided texts and add them to the newly created index.vectorStore.add_texts(texts,metadatas=metadatas,**kwargs)returnvectorStore
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[Dict[Any,Any]]]=None,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. Returns: List of ids from adding the texts into the vectorstore. """try:fromelasticsearch.helpersimportBulkIndexError,bulkexceptImportError:raiseImportError("Could not import elasticsearch python package. ""Please install it with `pip install elasticsearch`.")embeddings=[]create_index_if_not_exists=kwargs.get("create_index_if_not_exists",True)ids=kwargs.get("ids",[str(uuid.uuid4())for_intexts])refresh_indices=kwargs.get("refresh_indices",True)requests=[]ifself.embeddingisnotNone:embeddings=self.embedding.embed_documents(list(texts))dims_length=len(embeddings[0])ifcreate_index_if_not_exists:self._create_index_if_not_exists(dims_length=dims_length)fori,(text,vector)inenumerate(zip(texts,embeddings)):metadata=metadatas[i]ifmetadataselse{}requests.append({"_op_type":"index","_index":self.index_name,self.query_field:text,self.vector_query_field:vector,"metadata":metadata,"_id":ids[i],})else:ifcreate_index_if_not_exists:self._create_index_if_not_exists()fori,textinenumerate(texts):metadata=metadatas[i]ifmetadataselse{}requests.append({"_op_type":"index","_index":self.index_name,self.query_field:text,"metadata":metadata,"_id":ids[i],})iflen(requests)>0:try:success,failed=bulk(self.client,requests,stats_only=True,refresh=refresh_indices)logger.debug(f"Added {success} and failed to add {failed} texts to index")logger.debug(f"added texts {ids} to index")returnidsexceptBulkIndexErrorase:logger.error(f"Error adding texts: {e}")firstError=e.errors[0].get("index",{}).get("error",{})logger.error(f"First error reason: {firstError.get('reason')}")raiseeelse:logger.debug("No texts to add to index")return[]
@staticmethoddef_bes_vector_store(embedding:Optional[Embeddings]=None,**kwargs:Any)->"BESVectorStore":index_name=kwargs.get("index_name")ifindex_nameisNone:raiseValueError("Please provide an index_name.")bes_url=kwargs.get("bes_url")ifbes_urlisNone:raiseValueError("Please provided a valid bes connection url")returnBESVectorStore(embedding=embedding,**kwargs)