[docs]classAlibabaCloudOpenSearchSettings:"""Alibaba Cloud Opensearch` client configuration. Attribute: endpoint (str) : The endpoint of opensearch instance, You can find it from the console of Alibaba Cloud OpenSearch. instance_id (str) : The identify of opensearch instance, You can find it from the console of Alibaba Cloud OpenSearch. username (str) : The username specified when purchasing the instance. password (str) : The password specified when purchasing the instance, After the instance is created, you can modify it on the console. tablename (str): The table name specified during instance configuration. field_name_mapping (Dict) : Using field name mapping between opensearch vector store and opensearch instance configuration table field names: { 'id': 'The id field name map of index document.', 'document': 'The text field name map of index document.', 'embedding': 'In the embedding field of the opensearch instance, the values must be in float type and separated by separator, default is comma.', 'metadata_field_x': 'Metadata field mapping includes the mapped field name and operator in the mapping value, separated by a comma between the mapped field name and the operator.', } protocol (str): Communication Protocol between SDK and Server, default is http. namespace (str) : The instance data will be partitioned based on the "namespace" field,If the namespace is enabled, you need to specify the namespace field name during initialization, Otherwise, the queries cannot be executed correctly. embedding_field_separator(str): Delimiter specified for writing vector field data, default is comma. output_fields: Specify the field list returned when invoking OpenSearch, by default it is the value list of the field mapping field. """
[docs]defcreate_metadata(fields:Dict[str,Any])->Dict[str,Any]:"""Create metadata from fields. Args: fields: The fields of the document. The fields must be a dict. Returns: metadata: The metadata of the document. The metadata must be a dict. """metadata:Dict[str,Any]={}forkey,valueinfields.items():ifkey=="id"orkey=="document"orkey=="embedding":continuemetadata[key]=valuereturnmetadata
[docs]def__init__(self,embedding:Embeddings,config:AlibabaCloudOpenSearchSettings,**kwargs:Any,)->None:try:fromalibabacloud_ha3engine_vectorimportclient,modelsfromalibabacloud_tea_utilimportmodelsasutil_modelsexceptImportError:raiseImportError("Could not import alibaba cloud opensearch python package. ""Please install it with `pip install alibabacloud-ha3engine-vector`.")self.config=configself.embedding=embeddingself.runtime=util_models.RuntimeOptions(connect_timeout=5000,read_timeout=10000,autoretry=False,ignore_ssl=False,max_idle_conns=50,)self.ha3_engine_client=client.Client(models.Config(endpoint=config.endpoint,instance_id=config.instance_id,protocol=config.protocol,access_user_name=config.username,access_pass_word=config.password,))self.options_headers:Dict[str,str]={}
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,**kwargs:Any,)->List[str]:"""Insert documents into the instance.. Args: texts: The text segments to be inserted into the vector storage, should not be empty. metadatas: Metadata information. Returns: id_list: List of document IDs. """def_upsert(push_doc_list:List[Dict])->List[str]:ifpush_doc_listisNoneorlen(push_doc_list)==0:return[]try:push_request=models.PushDocumentsRequest(self.options_headers,push_doc_list)push_response=self.ha3_engine_client.push_documents(self.config.opt_table_name,field_name_map["id"],push_request)json_response=json.loads(push_response.body)ifjson_response["status"]=="OK":return[push_doc["fields"][field_name_map["id"]]forpush_docinpush_doc_list]return[]exceptExceptionase:logger.error(f"add doc to endpoint:{self.config.endpoint} "f"instance_id:{self.config.instance_id} failed.",e,)raiseefromalibabacloud_ha3engine_vectorimportmodelsid_list=[sha1(t.encode("utf-8")).hexdigest()fortintexts]embeddings=self.embedding.embed_documents(list(texts))metadatas=metadatasor[{}for_intexts]field_name_map=self.config.field_name_mappingadd_doc_list=[]text_list=list(texts)foridx,doc_idinenumerate(id_list):embedding=embeddings[idx]ifidx<len(embeddings)elseNonemetadata=metadatas[idx]ifidx<len(metadatas)elseNonetext=text_list[idx]ifidx<len(text_list)elseNoneadd_doc:Dict[str,Any]=dict()add_doc_fields:Dict[str,Any]=dict()add_doc_fields.__setitem__(field_name_map["id"],doc_id)add_doc_fields.__setitem__(field_name_map["document"],text)ifembeddingisnotNone:add_doc_fields.__setitem__(field_name_map["embedding"],self.config.embedding_field_separator.join(str(unit)forunitinembedding),)ifmetadataisnotNone:formd_key,md_valueinmetadata.items():add_doc_fields.__setitem__(field_name_map[md_key].split(",")[0],md_value)add_doc.__setitem__("fields",add_doc_fields)add_doc.__setitem__("cmd","add")add_doc_list.append(add_doc)return_upsert(add_doc_list)
[docs]defsimilarity_search(self,query:str,k:int=4,search_filter:Optional[Dict[str,Any]]=None,**kwargs:Any,)->List[Document]:"""Perform similarity retrieval based on text. Args: query: Vectorize text for retrieval.,should not be empty. k: top n. search_filter: Additional filtering conditions. Returns: document_list: List of documents. """embedding=self.embedding.embed_query(query)returnself.create_results(self.inner_embedding_query(embedding=embedding,search_filter=search_filter,k=k))
[docs]defsimilarity_search_with_relevance_scores(self,query:str,k:int=4,search_filter:Optional[dict]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Perform similarity retrieval based on text with scores. Args: query: Vectorize text for retrieval.,should not be empty. k: top n. search_filter: Additional filtering conditions. Returns: document_list: List of documents. """embedding:List[float]=self.embedding.embed_query(query)returnself.create_results_with_score(self.inner_embedding_query(embedding=embedding,search_filter=search_filter,k=k))
[docs]defsimilarity_search_by_vector(self,embedding:List[float],k:int=4,search_filter:Optional[dict]=None,**kwargs:Any,)->List[Document]:"""Perform retrieval directly using vectors. Args: embedding: vectors. k: top n. search_filter: Additional filtering conditions. Returns: document_list: List of documents. """returnself.create_results(self.inner_embedding_query(embedding=embedding,search_filter=search_filter,k=k))
[docs]definner_embedding_query(self,embedding:List[float],search_filter:Optional[Dict[str,Any]]=None,k:int=4,)->Dict[str,Any]:defgenerate_filter_query()->str:ifsearch_filterisNone:return""filter_clause=" AND ".join([create_filter(md_key,md_value)formd_key,md_valueinsearch_filter.items()])returnfilter_clausedefcreate_filter(md_key:str,md_value:Any)->str:md_filter_expr=self.config.field_name_mapping[md_key]ifmd_filter_exprisNone:return""expr=md_filter_expr.split(",")iflen(expr)!=2:logger.error(f"filter {md_filter_expr} express is not correct, "f"must contain mapping field and operator.")return""md_filter_key=expr[0].strip()md_filter_operator=expr[1].strip()ifisinstance(md_value,numbers.Number):returnf"{md_filter_key}{md_filter_operator}{md_value}"returnf'{md_filter_key}{md_filter_operator}"{md_value}"'defsearch_data()->Dict[str,Any]:request=QueryRequest(table_name=self.config.table_name,namespace=self.config.namespace,vector=embedding,include_vector=True,output_fields=self.config.output_fields,filter=generate_filter_query(),top_k=k,)query_result=self.ha3_engine_client.query(request)returnjson.loads(query_result.body)fromalibabacloud_ha3engine_vector.modelsimportQueryRequesttry:json_response=search_data()if("errorCode"injson_responseand"errorMsg"injson_responseandlen(json_response["errorMsg"])>0):logger.error(f"query {self.config.endpoint}{self.config.instance_id} "f"failed:{json_response['errorMsg']}.")else:returnjson_responseexceptExceptionase:logger.error(f"query instance endpoint:{self.config.endpoint} "f"instance_id:{self.config.instance_id} failed.",e,)return{}
[docs]defcreate_inverse_metadata(self,fields:Dict[str,Any])->Dict[str,Any]:"""Create metadata from fields. Args: fields: The fields of the document. The fields must be a dict. Returns: metadata: The metadata of the document. The metadata must be a dict. """metadata:Dict[str,Any]={}forkey,valueinfields.items():ifkey=="id"orkey=="document"orkey=="embedding":continuemetadata[self.config.inverse_field_name_mapping[key]]=valuereturnmetadata
[docs]defcreate_results_with_score(self,json_result:Dict[str,Any])->List[Tuple[Document,float]]:"""Parsing the returned results with scores. Args: json_result: Results from OpenSearch query. Returns: query_result_list: Results with scores. """items=json_result["result"]query_result_list:List[Tuple[Document,float]]=[]foriteminitems:fields=item["fields"]query_result_list.append((Document(page_content=fields[self.config.field_name_mapping["document"]],metadata=self.create_inverse_metadata(fields),),float(item["score"]),))returnquery_result_list
[docs]defdelete_documents_with_texts(self,texts:List[str])->bool:"""Delete documents based on their page content. Args: texts: List of document page content. Returns: Whether the deletion was successful or not. """id_list=[sha1(t.encode("utf-8")).hexdigest()fortintexts]returnself.delete_documents_with_document_id(id_list)
[docs]defdelete_documents_with_document_id(self,id_list:List[str])->bool:"""Delete documents based on their IDs. Args: id_list: List of document IDs. Returns: Whether the deletion was successful or not. """ifid_listisNoneorlen(id_list)==0:returnTruefromalibabacloud_ha3engine_vectorimportmodelsdelete_doc_list=[]fordoc_idinid_list:delete_doc_list.append({"fields":{self.config.field_name_mapping["id"]:doc_id},"cmd":"delete",})delete_request=models.PushDocumentsRequest(self.options_headers,delete_doc_list)try:delete_response=self.ha3_engine_client.push_documents(self.config.opt_table_name,self.config.field_name_mapping["id"],delete_request,)json_response=json.loads(delete_response.body)returnjson_response["status"]=="OK"exceptExceptionase:logger.error(f"delete doc from :{self.config.endpoint} "f"instance_id:{self.config.instance_id} failed.",e,)raisee
[docs]@classmethoddeffrom_texts(cls,texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,config:Optional[AlibabaCloudOpenSearchSettings]=None,**kwargs:Any,)->"AlibabaCloudOpenSearch":"""Create alibaba cloud opensearch vector store instance. Args: texts: The text segments to be inserted into the vector storage, should not be empty. embedding: Embedding function, Embedding function. config: Alibaba OpenSearch instance configuration. metadatas: Metadata information. Returns: AlibabaCloudOpenSearch: Alibaba cloud opensearch vector store instance. """iftextsisNoneorlen(texts)==0:raiseException("the inserted text segments, should not be empty.")ifembeddingisNone:raiseException("the embeddings should not be empty.")ifconfigisNone:raiseException("config should not be none.")ctx=cls(embedding,config,**kwargs)ctx.add_texts(texts=texts,metadatas=metadatas)returnctx
[docs]@classmethoddeffrom_documents(cls,documents:List[Document],embedding:Embeddings,config:Optional[AlibabaCloudOpenSearchSettings]=None,**kwargs:Any,)->"AlibabaCloudOpenSearch":"""Create alibaba cloud opensearch vector store instance. Args: documents: Documents to be inserted into the vector storage, should not be empty. embedding: Embedding function, Embedding function. config: Alibaba OpenSearch instance configuration. ids: Specify the ID for the inserted document. If left empty, the ID will be automatically generated based on the text content. Returns: AlibabaCloudOpenSearch: Alibaba cloud opensearch vector store instance. """ifdocumentsisNoneorlen(documents)==0:raiseException("the inserted documents, should not be empty.")ifembeddingisNone:raiseException("the embeddings should not be empty.")ifconfigisNone:raiseException("config can't be none")texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]returncls.from_texts(texts=texts,embedding=embedding,metadatas=metadatas,config=config,**kwargs,)