[docs]classEcloudESVectorStore(VectorStore):"""`ecloud Elasticsearch` vector store. Example: .. code-block:: python from langchain.vectorstores import EcloudESVectorStore from langchain.embeddings.openai import OpenAIEmbeddings embeddings = OpenAIEmbeddings() vectorstore = EcloudESVectorStore( embedding=OpenAIEmbeddings(), index_name="langchain-demo", es_url="http://localhost:9200" ) Args: index_name: Name of the Elasticsearch index to create. es_url: URL of the ecloud Elasticsearch instance to connect to. user: Username to use when connecting to Elasticsearch. password: Password to use when connecting to Elasticsearch. """
[docs]def__init__(self,index_name:str,es_url:str,user:Optional[str]=None,password:Optional[str]=None,embedding:Optional[Embeddings]=None,**kwargs:Optional[dict],)->None:self.embedding=embeddingself.index_name=index_nameself.text_field=kwargs.get("text_field","text")self.vector_field=kwargs.get("vector_field","vector")self.vector_type=kwargs.get("vector_type","knn_dense_float_vector")self.vector_params=kwargs.get("vector_params")or{}self.model=self.vector_params.get("model","")self.index_settings=kwargs.get("index_settings")or{}key_list=["text_field","vector_field","vector_type","vector_params","index_settings",][kwargs.pop(key,None)forkeyinkey_list]ifes_urlisnotNone:self.client=EcloudESVectorStore.es_client(es_url=es_url,username=user,password=password,**kwargs)else:raiseValueError("""Please specified a es connection url.""")
[docs]@staticmethoddefes_client(*,es_url:Optional[str]=None,username:Optional[str]=None,password:Optional[str]=None,**kwargs:Optional[dict],)->"Elasticsearch":try:importelasticsearchexceptImportError:raiseImportError("Could not import elasticsearch python package. ""Please install it with `pip install elasticsearch`.")connection_params:Dict[str,Any]={"hosts":[es_url]}ifusernameandpassword:connection_params["http_auth"]=(username,password)connection_params.update(kwargs)es_client=elasticsearch.Elasticsearch(**connection_params)try:es_client.info()exceptExceptionase:logger.error(f"Error connecting to Elasticsearch: {e}")raiseereturnes_client
def_create_index_if_not_exists(self,dims_length:Optional[int]=None)->None:"""Create the index if it doesn't already exist. Args: dims_length: Length of the embedding vectors. """ifself.client.indices.exists(index=self.index_name):logger.info(f"Index {self.index_name} already exists. Skipping creation.")else:ifdims_lengthisNone:raiseValueError("Cannot create index without specifying dims_length "+"when the index doesn't already exist. ")indexMapping=self._index_mapping(dims_length=dims_length)logger.debug(f"Creating index {self.index_name} with mappings {indexMapping}")self.client.indices.create(index=self.index_name,body={"settings":{"index.knn":True,**self.index_settings},"mappings":{"properties":indexMapping},},)def_index_mapping(self,dims_length:Union[int,None])->Dict:""" Executes when the index is created. Args: dims_length: Numeric length of the embedding vectors, or None if not using vector-based query. index_params: The extra pamameters for creating index. Returns: Dict: The Elasticsearch settings and mappings for the strategy. """model=self.vector_params.get("model","")if"lsh"==model:mapping:Dict[Any,Any]={self.vector_field:{"type":self.vector_type,"knn":{"dims":dims_length,"model":"lsh","similarity":self.vector_params.get("similarity","cosine"),"L":self.vector_params.get("L",99),"k":self.vector_params.get("k",1),},}}ifmapping[self.vector_field]["knn"]["similarity"]=="l2":mapping[self.vector_field]["knn"]["w"]=self.vector_params.get("w",3)returnmappingelif"permutation_lsh"==model:return{self.vector_field:{"type":self.vector_type,"knn":{"dims":dims_length,"model":"permutation_lsh","k":self.vector_params.get("k",10),"similarity":self.vector_params.get("similarity","cosine"),"repeating":self.vector_params.get("repeating",True),},}}else:return{self.vector_field:{"type":self.vector_type,"knn":{"dims":dims_length},}}
[docs]defdelete(self,ids:Optional[List[str]]=None,**kwargs:Any,)->Optional[bool]:"""Delete documents from the index. Args: ids: List of ids of documents to delete """try:fromelasticsearch.helpersimportBulkIndexError,bulkexceptImportError:raiseImportError("Could not import elasticsearch python package. ""Please install it with `pip install elasticsearch`.")body=[]ifidsisNone:raiseValueError("ids must be provided.")for_idinids:body.append({"_op_type":"delete","_index":self.index_name,"_id":_id})iflen(body)>0:try:bulk(self.client,body,refresh=kwargs.get("refresh_indices",True),ignore_status=404,)logger.debug(f"Deleted {len(body)} texts from index")returnTrueexceptBulkIndexErrorase:logger.error(f"Error deleting texts: {e}")raiseeelse:logger.info("No documents to delete")returnFalse
[docs]@staticmethoddefget_dense_specific_model_similarity_params(search_params:Dict[str,Any],)->Dict:model=search_params.get("model","exact")similarity=search_params.get("similarity","cosine")specific_params={"model":model,"similarity":similarity}ifnotmodel=="exact":ifmodelnotin("lsh","permutation_lsh"):raiseValueError(f"vector type knn_dense_float_vector doesn't support model {model}")ifsimilaritynotin("cosine","l2"):raiseValueError(f"model exact doesn't support similarity {similarity}")specific_params["candidates"]=search_params.get("candidates",search_params.get("size",4))ifmodel=="lsh"andsimilarity=="l2":specific_params["probes"]=search_params.get("probes",0)else:ifsimilaritynotin("cosine","l2"):raiseValueError(f"model exact don't support similarity {similarity}")returnspecific_params
[docs]@staticmethoddefget_sparse_specific_model_similarity_params(search_params:Dict[str,Any],)->Dict:model=search_params.get("model","exact")similarity=search_params.get("similarity","hamming")specific_params={"model":model,"similarity":similarity}ifnotmodel=="exact":ifmodelnotin("lsh",):raiseValueError(f"vector type knn_dense_float_vector doesn't support model {model}")ifsimilaritynotin("hamming","jaccard"):raiseValueError(f"model exact doesn't support similarity {similarity}")specific_params["candidates"]=search_params.get("candidates",search_params.get("size",4))else:ifsimilaritynotin("hamming","jaccard"):raiseValueError(f"model exact don't support similarity {similarity}")returnspecific_params
def_search(self,query:Optional[str]=None,query_vector:Union[List[float],None]=None,filter:Optional[dict]=None,custom_query:Optional[Callable[[Dict,Union[str,None]],Dict]]=None,search_params:Dict={},)->List[Tuple[Document,float]]:"""Return searched documents result from ecloud ES Args: query: Text to look up documents similar to. query_vector: Embedding to look up documents similar to. filter: Array of ecloud ElasticSearch filter clauses to apply to the query. custom_query: Function to modify the query body before it is sent to ES. Returns: List of Documents most similar to the query and score for each """ifself.embeddingandqueryisnotNone:query_vector=self.embedding.embed_query(query)query_body=self._query_body(query_vector=query_vector,filter=filter,search_params=search_params)ifcustom_queryisnotNone:query_body=custom_query(query_body,query)logger.debug(f"Calling custom_query, Query body now: {query_body}")logger.debug(f"Query body: {query_body}")# Perform the kNN search on the ES index and return the results.response=self.client.search(index=self.index_name,body=query_body)logger.debug(f"response={response}")hits=[hitforhitinresponse["hits"]["hits"]]docs_and_scores=[(Document(page_content=hit["_source"][search_params.get("text_field",self.text_field)],metadata=hit["_source"]["metadata"],),hit["_score"],)forhitinhits]returndocs_and_scores
[docs]defsimilarity_search(self,query:str,k:int=4,filter:Optional[dict]=None,**kwargs:Any,)->List[Document]:"""Return documents most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Array of Elasticsearch filter clauses to apply to the query. Returns: List of Documents most similar to the query, in descending order of similarity. """results=self.similarity_search_with_score(query=query,k=k,filter=filter,**kwargs)return[docfordoc,_inresults]
[docs]defsimilarity_search_with_score(self,query:str,k:int,filter:Optional[dict]=None,**kwargs:Any)->List[Tuple[Document,float]]:"""Return documents most similar to query, along with scores. Args: query: Text to look up documents similar to. size: Number of Documents to return. Defaults to 4. filter: Array of Elasticsearch filter clauses to apply to the query. Returns: List of Documents most similar to the query and score for each """search_params:Dict[str,Any]=kwargs.get("search_params")or{}iflen(search_params)==0:kwargs={"search_params":{"size":k}}elifsearch_params.get("size")isNone:search_params["size"]=kkwargs["search_params"]=search_paramsreturnself._search(query=query,filter=filter,**kwargs)
[docs]@classmethoddeffrom_documents(cls,documents:List[Document],embedding:Optional[Embeddings]=None,**kwargs:Any,)->"EcloudESVectorStore":"""Construct EcloudESVectorStore wrapper from documents. Args: documents: List of documents to add to the Elasticsearch index. embedding: Embedding function to use to embed the texts. Do not provide if using a strategy that doesn't require inference. kwargs: create index key words arguments """vectorStore=EcloudESVectorStore._es_vector_store(embedding=embedding,**kwargs)# Encode the provided texts and add them to the newly created index.vectorStore.add_documents(documents)returnvectorStore
[docs]@classmethoddeffrom_texts(cls,texts:List[str],embedding:Optional[Embeddings]=None,metadatas:Optional[List[Dict[str,Any]]]=None,**kwargs:Any,)->"EcloudESVectorStore":"""Construct EcloudESVectorStore wrapper from raw documents. Args: texts: List of texts to add to the Elasticsearch index. embedding: Embedding function to use to embed the texts. metadatas: Optional list of metadatas associated with the texts. index_name: Name of the Elasticsearch index to create. kwargs: create index key words arguments """vectorStore=cls._es_vector_store(embedding=embedding,**kwargs)# Encode the provided texts and add them to the newly created index.vectorStore.add_texts(texts,metadatas=metadatas,**kwargs)returnvectorStore
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[Dict[Any,Any]]]=None,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. Returns: List of ids from adding the texts into the vectorstore. """try:fromelasticsearch.helpersimportBulkIndexError,bulkexceptImportError:raiseImportError("Could not import elasticsearch python package. ""Please install it with `pip install elasticsearch`.")embeddings=[]create_index_if_not_exists=kwargs.get("create_index_if_not_exists",True)ids=kwargs.get("ids",[str(uuid.uuid4())for_intexts])refresh_indices=kwargs.get("refresh_indices",False)requests=[]ifself.embeddingisnotNone:embeddings=self.embedding.embed_documents(list(texts))dims_length=len(embeddings[0])ifcreate_index_if_not_exists:self._create_index_if_not_exists(dims_length=dims_length)fori,(text,vector)inenumerate(zip(texts,embeddings)):metadata=metadatas[i]ifmetadataselse{}doc={"_op_type":"index","_index":self.index_name,self.text_field:text,"metadata":metadata,"_id":ids[i],}ifself.vector_type=="knn_dense_float_vector":doc[self.vector_field]=vectorelifself.vector_type=="knn_sparse_bool_vector":doc[self.vector_field]={"true_indices":vector,"total_indices":len(vector),}requests.append(doc)else:ifcreate_index_if_not_exists:self._create_index_if_not_exists()fori,textinenumerate(texts):metadata=metadatas[i]ifmetadataselse{}requests.append({"_op_type":"index","_index":self.index_name,self.text_field:text,"metadata":metadata,"_id":ids[i],})iflen(requests)>0:try:success,failed=bulk(self.client,requests,stats_only=True,refresh=refresh_indices)logger.debug(f"Added {success} and failed to add {failed} texts to index")logger.debug(f"added texts {ids} to index")ifrefresh_indices:self.client.indices.refresh(index=self.index_name)returnidsexceptBulkIndexErrorase:logger.error(f"Error adding texts: {e}")firstError=e.errors[0].get("index",{}).get("error",{})logger.error(f"First error reason: {firstError.get('reason')}")raiseeelse:logger.debug("No texts to add to index")return[]
@staticmethoddef_es_vector_store(embedding:Optional[Embeddings]=None,**kwargs:Any)->"EcloudESVectorStore":index_name=kwargs.get("index_name")ifindex_nameisNone:raiseValueError("Please provide an index_name.")es_url=kwargs.get("es_url")ifes_urlisNone:raiseValueError("Please provided a valid es connection url")returnEcloudESVectorStore(embedding=embedding,**kwargs)