Source code for langchain_community.vectorstores.infinispanvs
"""Module providing Infinispan as a VectorStore"""from__future__importannotationsimportjsonimportloggingimportuuidfromtypingimportAny,Iterable,List,Optional,Tuple,Type,castimportrequestsfromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.vectorstoresimportVectorStorelogger=logging.getLogger(__name__)
[docs]classInfinispanVS(VectorStore):"""`Infinispan` VectorStore interface. This class exposes the method to present Infinispan as a VectorStore. It relies on the Infinispan class (below) which takes care of the REST interface with the server. Example: ... code-block:: python from langchain_community.vectorstores import InfinispanVS from mymodels import RGBEmbeddings ... vectorDb = InfinispanVS.from_documents(docs, embedding=RGBEmbeddings(), output_fields=["texture", "color"], lambda_key=lambda text,meta: str(meta["_key"]), lambda_content=lambda item: item["color"]) or an empty InfinispanVS instance can be created if preliminary setup is required before populating the store ... code-block:: python from langchain_community.vectorstores import InfinispanVS from mymodels import RGBEmbeddings ... ispnVS = InfinispanVS() # configure Infinispan here # i.e. create cache and schema # then populate the store vectorDb = InfinispanVS.from_documents(docs, embedding=RGBEmbeddings(), output_fields: ["texture", "color"], lambda_key: lambda text,meta: str(meta["_key"]), lambda_content: lambda item: item["color"]}) """
[docs]defschema_builder(self,templ:dict,dimension:int)->str:metadata_proto_tpl="""/*** @Indexed*/message %s {/*** @Vector(dimension=%d)*/repeated float %s = 1;"""metadata_proto=metadata_proto_tpl%(self._entity_name,dimension,self._vectorfield,)idx=2forf,vintempl.items():ifisinstance(v,str):metadata_proto+="optional string "+f+" = "+str(idx)+";\n"elifisinstance(v,int):metadata_proto+="optional int64 "+f+" = "+str(idx)+";\n"elifisinstance(v,float):metadata_proto+="optional double "+f+" = "+str(idx)+";\n"elifisinstance(v,bytes):metadata_proto+="optional bytes "+f+" = "+str(idx)+";\n"elifisinstance(v,bool):metadata_proto+="optional bool "+f+" = "+str(idx)+";\n"else:raiseException("Unable to build proto schema for metadata. ""Unhandled type for field: "+f)idx+=1metadata_proto+="}\n"returnmetadata_proto
[docs]defschema_create(self,proto:str)->requests.Response:"""Deploy the schema for the vector db Args: proto(str): protobuf schema Returns: An http Response containing the result of the operation """returnself.ispn.schema_post(self._entity_name+".proto",proto)
[docs]defschema_delete(self)->requests.Response:"""Delete the schema for the vector db Returns: An http Response containing the result of the operation """returnself.ispn.schema_delete(self._entity_name+".proto")
[docs]defcache_create(self,config:str="")->requests.Response:"""Create the cache for the vector db Args: config(str): configuration of the cache. Returns: An http Response containing the result of the operation """ifconfig=="":config=(''' { "distributed-cache": { "owners": "2", "mode": "SYNC", "statistics": true, "encoding": { "media-type": "application/x-protostream" }, "indexing": { "enabled": true, "storage": "filesystem", "startup-mode": "AUTO", "indexing-mode": "AUTO", "indexed-entities": [ "'''+self._entity_name+"""" ] } }}""")returnself.ispn.cache_post(self._cache_name,config)
[docs]defcache_delete(self)->requests.Response:"""Delete the cache for the vector db Returns: An http Response containing the result of the operation """returnself.ispn.cache_delete(self._cache_name)
[docs]defcache_clear(self)->requests.Response:"""Clear the cache for the vector db Returns: An http Response containing the result of the operation """returnself.ispn.cache_clear(self._cache_name)
[docs]defcache_exists(self)->bool:"""Checks if the cache exists Returns: true if exists """returnself.ispn.cache_exists(self._cache_name)
[docs]defcache_index_clear(self)->requests.Response:"""Clear the index for the vector db Returns: An http Response containing the result of the operation """returnself.ispn.index_clear(self._cache_name)
[docs]defcache_index_reindex(self)->requests.Response:"""Rebuild the for the vector db Returns: An http Response containing the result of the operation """returnself.ispn.index_reindex(self._cache_name)
[docs]defsimilarity_search(self,query:str,k:int=4,**kwargs:Any)->List[Document]:"""Return docs most similar to query."""documents=self.similarity_search_with_score(query=query,k=k)return[docfordoc,_indocuments]
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,**kwargs:Any)->List[Tuple[Document,float]]:"""Perform a search on a query string and return results with score. Args: query (str): The text being searched. k (int, optional): The amount of results to return. Defaults to 4. Returns: List[Tuple[Document, float]] """embed=self._embedding.embed_query(query)# type: ignoredocuments=self.similarity_search_with_score_by_vector(embedding=embed,k=k)returndocuments
[docs]defsimilarity_search_with_score_by_vector(self,embedding:List[float],k:int=4)->List[Tuple[Document,float]]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. Returns: List of pair (Documents, score) most similar to the query vector. """ifself._output_fieldsisNone:query_str=("select v, score(v) from "+self._entity_name+" v where v."+self._vectorfield+" <-> "+json.dumps(embedding)+"~"+str(k))else:query_proj="select "forfieldinself._output_fields[:-1]:query_proj=query_proj+"v."+field+","query_proj=query_proj+"v."+self._output_fields[-1]query_str=(query_proj+", score(v) from "+self._entity_name+" v where v."+self._vectorfield+" <-> "+json.dumps(embedding)+"~"+str(k))query_res=self.ispn.req_query(query_str,self._cache_name)result=json.loads(query_res.text)returnself._query_result_to_docs(result)
[docs]defconfigure(self,metadata:dict,dimension:int)->None:schema=self.schema_builder(metadata,dimension)output=self.schema_create(schema)assertoutput.ok,"Unable to create schema. Already exists? ""Consider using clear_old=True"assertjson.loads(output.text)["error"]isNoneifnotself.cache_exists():output=self.cache_create()assertoutput.ok,"Unable to create cache. Already exists? ""Consider using clear_old=True"# Ensure index is cleanself.cache_index_clear()
[docs]@classmethoddeffrom_texts(cls:Type[InfinispanVS],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,clear_old:Optional[bool]=True,auto_config:Optional[bool]=True,**kwargs:Any,)->InfinispanVS:"""Return VectorStore initialized from texts and embeddings."""infinispanvs=cls(embedding=embedding,ids=ids,**kwargs)ifauto_configandlen(metadatasor[])>0:ifclear_old:infinispanvs.config_clear()vec=embedding.embed_query(texts[len(texts)-1])metadatas=cast(List[dict],metadatas)infinispanvs.configure(metadatas[0],len(vec))else:ifclear_old:infinispanvs.cache_clear()vec=embedding.embed_query(texts[len(texts)-1])iftexts:infinispanvs.add_texts(texts,metadatas,vector=vec)returninfinispanvs
REST_TIMEOUT=10
[docs]classInfinispan:"""Helper class for `Infinispan` REST interface. This class exposes the Infinispan operations needed to create and set up a vector db. You need a running Infinispan (15+) server without authentication. You can easily start one, see: https://github.com/rigazilla/infinispan-vector#run-infinispan """
[docs]defreq_query(self,query:str,cache_name:str,local:bool=False)->requests.Response:"""Request a query Args: query(str): query requested cache_name(str): name of the target cache local(boolean): whether the query is local to clustered Returns: An http Response containing the result set or errors """ifself._use_post_for_query:returnself._query_post(query,cache_name,local)returnself._query_get(query,cache_name,local)
[docs]defpost(self,key:str,data:str,cache_name:str)->requests.Response:"""Post an entry Args: key(str): key of the entry data(str): content of the entry in json format cache_name(str): target cache Returns: An http Response containing the result of the operation """api_url=self._default_node+self._cache_url+"/"+cache_name+"/"+keyresponse=requests.post(api_url,data,headers={"Content-Type":"application/json"},timeout=REST_TIMEOUT,)returnresponse
[docs]defput(self,key:str,data:str,cache_name:str)->requests.Response:"""Put an entry Args: key(str): key of the entry data(str): content of the entry in json format cache_name(str): target cache Returns: An http Response containing the result of the operation """api_url=self._default_node+self._cache_url+"/"+cache_name+"/"+keyresponse=requests.put(api_url,data,headers={"Content-Type":"application/json"},timeout=REST_TIMEOUT,)returnresponse
[docs]defget(self,key:str,cache_name:str)->requests.Response:"""Get an entry Args: key(str): key of the entry cache_name(str): target cache Returns: An http Response containing the entry or errors """api_url=self._default_node+self._cache_url+"/"+cache_name+"/"+keyresponse=requests.get(api_url,headers={"Content-Type":"application/json"},timeout=REST_TIMEOUT)returnresponse
[docs]defschema_post(self,name:str,proto:str)->requests.Response:"""Deploy a schema Args: name(str): name of the schema. Will be used as a key proto(str): protobuf schema Returns: An http Response containing the result of the operation """api_url=self._default_node+self._schema_url+"/"+nameresponse=requests.post(api_url,proto,timeout=REST_TIMEOUT)returnresponse
[docs]defcache_post(self,name:str,config:str)->requests.Response:"""Create a cache Args: name(str): name of the cache. config(str): configuration of the cache. Returns: An http Response containing the result of the operation """api_url=self._default_node+self._cache_url+"/"+nameresponse=requests.post(api_url,config,headers={"Content-Type":"application/json"},timeout=REST_TIMEOUT,)returnresponse
[docs]defschema_delete(self,name:str)->requests.Response:"""Delete a schema Args: name(str): name of the schema. Returns: An http Response containing the result of the operation """api_url=self._default_node+self._schema_url+"/"+nameresponse=requests.delete(api_url,timeout=REST_TIMEOUT)returnresponse
[docs]defcache_delete(self,name:str)->requests.Response:"""Delete a cache Args: name(str): name of the cache. Returns: An http Response containing the result of the operation """api_url=self._default_node+self._cache_url+"/"+nameresponse=requests.delete(api_url,timeout=REST_TIMEOUT)returnresponse
[docs]defcache_clear(self,cache_name:str)->requests.Response:"""Clear a cache Args: cache_name(str): name of the cache. Returns: An http Response containing the result of the operation """api_url=(self._default_node+self._cache_url+"/"+cache_name+"?action=clear")response=requests.post(api_url,timeout=REST_TIMEOUT)returnresponse
[docs]defcache_exists(self,cache_name:str)->bool:"""Check if a cache exists Args: cache_name(str): name of the cache. Returns: True if cache exists """api_url=(self._default_node+self._cache_url+"/"+cache_name+"?action=clear")returnself.resource_exists(api_url)
[docs]@staticmethoddefresource_exists(api_url:str)->bool:"""Check if a resource exists Args: api_url(str): url of the resource. Returns: true if resource exists """response=requests.head(api_url,timeout=REST_TIMEOUT)returnresponse.ok
[docs]defindex_clear(self,cache_name:str)->requests.Response:"""Clear an index on a cache Args: cache_name(str): name of the cache. Returns: An http Response containing the result of the operation """api_url=(self._default_node+self._cache_url+"/"+cache_name+"/search/indexes?action=clear")returnrequests.post(api_url,timeout=REST_TIMEOUT)
[docs]defindex_reindex(self,cache_name:str)->requests.Response:"""Rebuild index on a cache Args: cache_name(str): name of the cache. Returns: An http Response containing the result of the operation """api_url=(self._default_node+self._cache_url+"/"+cache_name+"/search/indexes?action=reindex")returnrequests.post(api_url,timeout=REST_TIMEOUT)