Source code for langchain_community.vectorstores.infinispanvs
"""Module providing Infinispan as a VectorStore"""from__future__importannotationsimportjsonimportloggingimportuuidimportwarningsfromtypingimportAny,Iterable,List,Optional,Tuple,Type,Union,castfromhttpximportResponsefromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.vectorstoresimportVectorStorelogger=logging.getLogger(__name__)
[docs]classInfinispanVS(VectorStore):"""`Infinispan` VectorStore interface. This class exposes the method to present Infinispan as a VectorStore. It relies on the Infinispan class (below) which takes care of the REST interface with the server. Example: ... code-block:: python from langchain_community.vectorstores import InfinispanVS from mymodels import RGBEmbeddings ... vectorDb = InfinispanVS.from_documents(docs, embedding=RGBEmbeddings(), output_fields=["texture", "color"], lambda_key=lambda text,meta: str(meta["_key"]), lambda_content=lambda item: item["color"]) or an empty InfinispanVS instance can be created if preliminary setup is required before populating the store ... code-block:: python from langchain_community.vectorstores import InfinispanVS from mymodels import RGBEmbeddings ... ispnVS = InfinispanVS() # configure Infinispan here # i.e. create cache and schema # then populate the store vectorDb = InfinispanVS.from_documents(docs, embedding=RGBEmbeddings(), output_fields: ["texture", "color"], lambda_key: lambda text,meta: str(meta["_key"]), lambda_content: lambda item: item["color"]) """
[docs]def__init__(self,embedding:Optional[Embeddings]=None,ids:Optional[List[str]]=None,**kwargs:Any,):""" Parameters ---------- cache_name: str Embeddings cache name. Default "vector" entity_name: str Protobuf entity name for the embeddings. Default "vector" text_field: str Protobuf field name for text. Default "text" vector_field: str Protobuf field name for vector. Default "vector" lambda_content: lambda Lambda returning the content part of an item. Default returns text_field lambda_metadata: lambda Lambda returning the metadata part of an item. Default returns items fields excepts text_field, vector_field, _type output_fields: List[str] List of fields to be returned from item, if None return all fields. Default None kwargs: Any Rest of arguments passed to Infinispan. See docs"""self.ispn=Infinispan(**kwargs)self._configuration=kwargsself._cache_name=str(self._configuration.get("cache_name","vector"))self._entity_name=str(self._configuration.get("entity_name","vector"))self._embedding=embeddingself._textfield=self._configuration.get("textfield","")ifself._textfield=="":self._textfield=self._configuration.get("text_field","text")else:warnings.warn("`textfield` is deprecated. Please use `text_field` param.",DeprecationWarning,)self._vectorfield=self._configuration.get("vectorfield","")ifself._vectorfield=="":self._vectorfield=self._configuration.get("vector_field","vector")else:warnings.warn("`vectorfield` is deprecated. Please use `vector_field` param.",DeprecationWarning,)self._to_content=self._configuration.get("lambda_content",lambdaitem:self._default_content(item))self._to_metadata=self._configuration.get("lambda_metadata",lambdaitem:self._default_metadata(item))self._output_fields=self._configuration.get("output_fields")self._ids=ids
[docs]defschema_builder(self,templ:dict,dimension:int)->str:metadata_proto_tpl="""/*** @Indexed*/message %s {/*** @Vector(dimension=%d)*/repeated float %s = 1;"""metadata_proto=metadata_proto_tpl%(self._entity_name,dimension,self._vectorfield,)idx=2forf,vintempl.items():ifisinstance(v,str):metadata_proto+="optional string "+f+" = "+str(idx)+";\n"elifisinstance(v,int):metadata_proto+="optional int64 "+f+" = "+str(idx)+";\n"elifisinstance(v,float):metadata_proto+="optional double "+f+" = "+str(idx)+";\n"elifisinstance(v,bytes):metadata_proto+="optional bytes "+f+" = "+str(idx)+";\n"elifisinstance(v,bool):metadata_proto+="optional bool "+f+" = "+str(idx)+";\n"else:raiseException("Unable to build proto schema for metadata. ""Unhandled type for field: "+f)idx+=1metadata_proto+="}\n"returnmetadata_proto
[docs]defschema_create(self,proto:str)->Response:"""Deploy the schema for the vector db Args: proto(str): protobuf schema Returns: An http Response containing the result of the operation """returnself.ispn.schema_post(self._entity_name+".proto",proto)
[docs]defschema_delete(self)->Response:"""Delete the schema for the vector db Returns: An http Response containing the result of the operation """returnself.ispn.schema_delete(self._entity_name+".proto")
[docs]defcache_create(self,config:str="")->Response:"""Create the cache for the vector db Args: config(str): configuration of the cache. Returns: An http Response containing the result of the operation """ifconfig=="":config=(''' { "distributed-cache": { "owners": "2", "mode": "SYNC", "statistics": true, "encoding": { "media-type": "application/x-protostream" }, "indexing": { "enabled": true, "storage": "filesystem", "startup-mode": "AUTO", "indexing-mode": "AUTO", "indexed-entities": [ "'''+self._entity_name+"""" ] } }}""")returnself.ispn.cache_post(self._cache_name,config)
[docs]defcache_delete(self)->Response:"""Delete the cache for the vector db Returns: An http Response containing the result of the operation """returnself.ispn.cache_delete(self._cache_name)
[docs]defcache_clear(self)->Response:"""Clear the cache for the vector db Returns: An http Response containing the result of the operation """returnself.ispn.cache_clear(self._cache_name)
[docs]defcache_exists(self)->bool:"""Checks if the cache exists Returns: true if exists """returnself.ispn.cache_exists(self._cache_name)
[docs]defcache_index_clear(self)->Response:"""Clear the index for the vector db Returns: An http Response containing the result of the operation """returnself.ispn.index_clear(self._cache_name)
[docs]defcache_index_reindex(self)->Response:"""Rebuild the for the vector db Returns: An http Response containing the result of the operation """returnself.ispn.index_reindex(self._cache_name)
[docs]defsimilarity_search(self,query:str,k:int=4,**kwargs:Any)->List[Document]:"""Return docs most similar to query."""documents=self.similarity_search_with_score(query=query,k=k)return[docfordoc,_indocuments]
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,**kwargs:Any)->List[Tuple[Document,float]]:"""Perform a search on a query string and return results with score. Args: query (str): The text being searched. k (int, optional): The amount of results to return. Defaults to 4. Returns: List[Tuple[Document, float]] """embed=self._embedding.embed_query(query)# type: ignoredocuments=self.similarity_search_with_score_by_vector(embedding=embed,k=k)returndocuments
[docs]defsimilarity_search_with_score_by_vector(self,embedding:List[float],k:int=4)->List[Tuple[Document,float]]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. Returns: List of pair (Documents, score) most similar to the query vector. """ifself._output_fieldsisNone:query_str=("select v, score(v) from "+self._entity_name+" v where v."+self._vectorfield+" <-> "+json.dumps(embedding)+"~"+str(k))else:query_proj="select "forfieldinself._output_fields[:-1]:query_proj=query_proj+"v."+field+","query_proj=query_proj+"v."+self._output_fields[-1]query_str=(query_proj+", score(v) from "+self._entity_name+" v where v."+self._vectorfield+" <-> "+json.dumps(embedding)+"~"+str(k))query_res=self.ispn.req_query(query_str,self._cache_name)result=json.loads(query_res.text)returnself._query_result_to_docs(result)
[docs]defconfigure(self,metadata:dict,dimension:int)->None:schema=self.schema_builder(metadata,dimension)output=self.schema_create(schema)assertoutput.status_code==self.ispn.Codes.OK,("Unable to create schema. Already exists? ")"Consider using clear_old=True"assertjson.loads(output.text)["error"]isNoneifnotself.cache_exists():output=self.cache_create()assertoutput.status_code==self.ispn.Codes.OK,("Unable to create cache. Already exists? ")"Consider using clear_old=True"# Ensure index is cleanself.cache_index_clear()
[docs]@classmethoddeffrom_texts(cls:Type[InfinispanVS],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,clear_old:Optional[bool]=True,auto_config:Optional[bool]=True,**kwargs:Any,)->InfinispanVS:"""Return VectorStore initialized from texts and embeddings. In addition to parameters described by the super method, this implementation provides other configuration params if different configuration from default is needed. Parameters ---------- ids : List[str] Additional list of keys associated to the embedding. If not provided UUIDs will be generated clear_old : bool Whether old data must be deleted. Default True auto_config: bool Whether to do a complete server setup (caches, protobuf definition...). Default True kwargs: Any Rest of arguments passed to InfinispanVS. See docs"""infinispanvs=cls(embedding=embedding,ids=ids,**kwargs)ifauto_configandlen(metadatasor[])>0:ifclear_old:infinispanvs.config_clear()vec=embedding.embed_query(texts[len(texts)-1])metadatas=cast(List[dict],metadatas)infinispanvs.configure(metadatas[0],len(vec))else:ifclear_old:infinispanvs.cache_clear()vec=embedding.embed_query(texts[len(texts)-1])iftexts:infinispanvs.add_texts(texts,metadatas,vector=vec)returninfinispanvs
REST_TIMEOUT=10
[docs]classInfinispan:"""Helper class for `Infinispan` REST interface. This class exposes the Infinispan operations needed to create and set up a vector db. You need a running Infinispan (15+) server without authentication. You can easily start one, see: https://github.com/rigazilla/infinispan-vector#run-infinispan """
[docs]def__init__(self,schema:str="http",user:str="",password:str="",hosts:List[str]=["127.0.0.1:11222"],cache_url:str="/rest/v2/caches",schema_url:str="/rest/v2/schemas",use_post_for_query:bool=True,http2:bool=True,verify:bool=True,**kwargs:Any,):""" Parameters ---------- schema: str Schema for HTTP request: "http" or "https". Default "http" user, password: str User and password if auth is required. Default None hosts: List[str] List of server addresses. Default ["127.0.0.1:11222"] cache_url: str URL endpoint for cache API. Default "/rest/v2/caches" schema_url: str URL endpoint for schema API. Default "/rest/v2/schemas" use_post_for_query: bool Whether POST method should be used for query. Default True http2: bool Whether HTTP/2 protocol should be used. `pip install "httpx[http2]"` is needed for HTTP/2. Default True verify: bool Whether TLS certificate must be verified. Default True """try:importhttpxexceptImportError:raiseImportError("Could not import httpx python package. ""Please install it with `pip install httpx`"'or `pip install "httpx[http2]"` if you need HTTP/2.')self.Codes=httpx.codesself._configuration=kwargsself._schema=schemaself._user=userself._password=passwordself._host=hosts[0]self._default_node=self._schema+"://"+self._hostself._cache_url=cache_urlself._schema_url=schema_urlself._use_post_for_query=use_post_for_queryself._http2=http2ifself._userandself._password:ifself._schema=="http":auth:Union[Tuple[str,str],httpx.DigestAuth]=httpx.DigestAuth(username=self._user,password=self._password)else:auth=(self._user,self._password)self._h2c=httpx.Client(http2=self._http2,http1=notself._http2,auth=auth,verify=verify,)else:self._h2c=httpx.Client(http2=self._http2,http1=notself._http2,verify=verify,)
[docs]defreq_query(self,query:str,cache_name:str,local:bool=False)->Response:"""Request a query Args: query(str): query requested cache_name(str): name of the target cache local(boolean): whether the query is local to clustered Returns: An http Response containing the result set or errors """ifself._use_post_for_query:returnself._query_post(query,cache_name,local)returnself._query_get(query,cache_name,local)
[docs]defpost(self,key:str,data:str,cache_name:str)->Response:"""Post an entry Args: key(str): key of the entry data(str): content of the entry in json format cache_name(str): target cache Returns: An http Response containing the result of the operation """api_url=self._default_node+self._cache_url+"/"+cache_name+"/"+keyresponse=self._h2c.post(api_url,content=data,headers={"Content-Type":"application/json"},timeout=REST_TIMEOUT,)returnresponse
[docs]defput(self,key:str,data:str,cache_name:str)->Response:"""Put an entry Args: key(str): key of the entry data(str): content of the entry in json format cache_name(str): target cache Returns: An http Response containing the result of the operation """api_url=self._default_node+self._cache_url+"/"+cache_name+"/"+keyresponse=self._h2c.put(api_url,content=data,headers={"Content-Type":"application/json"},timeout=REST_TIMEOUT,)returnresponse
[docs]defget(self,key:str,cache_name:str)->Response:"""Get an entry Args: key(str): key of the entry cache_name(str): target cache Returns: An http Response containing the entry or errors """api_url=self._default_node+self._cache_url+"/"+cache_name+"/"+keyresponse=self._h2c.get(api_url,headers={"Content-Type":"application/json"},timeout=REST_TIMEOUT)returnresponse
[docs]defschema_post(self,name:str,proto:str)->Response:"""Deploy a schema Args: name(str): name of the schema. Will be used as a key proto(str): protobuf schema Returns: An http Response containing the result of the operation """api_url=self._default_node+self._schema_url+"/"+nameresponse=self._h2c.post(api_url,content=proto,timeout=REST_TIMEOUT)returnresponse
[docs]defcache_post(self,name:str,config:str)->Response:"""Create a cache Args: name(str): name of the cache. config(str): configuration of the cache. Returns: An http Response containing the result of the operation """api_url=self._default_node+self._cache_url+"/"+nameresponse=self._h2c.post(api_url,content=config,headers={"Content-Type":"application/json"},timeout=REST_TIMEOUT,)returnresponse
[docs]defschema_delete(self,name:str)->Response:"""Delete a schema Args: name(str): name of the schema. Returns: An http Response containing the result of the operation """api_url=self._default_node+self._schema_url+"/"+nameresponse=self._h2c.delete(api_url,timeout=REST_TIMEOUT)returnresponse
[docs]defcache_delete(self,name:str)->Response:"""Delete a cache Args: name(str): name of the cache. Returns: An http Response containing the result of the operation """api_url=self._default_node+self._cache_url+"/"+nameresponse=self._h2c.delete(api_url,timeout=REST_TIMEOUT)returnresponse
[docs]defcache_clear(self,cache_name:str)->Response:"""Clear a cache Args: cache_name(str): name of the cache. Returns: An http Response containing the result of the operation """api_url=(self._default_node+self._cache_url+"/"+cache_name+"?action=clear")response=self._h2c.post(api_url,timeout=REST_TIMEOUT)returnresponse
[docs]defcache_exists(self,cache_name:str)->bool:"""Check if a cache exists Args: cache_name(str): name of the cache. Returns: True if cache exists """api_url=(self._default_node+self._cache_url+"/"+cache_name+"?action=clear")returnself.resource_exists(api_url)
[docs]defresource_exists(self,api_url:str)->bool:"""Check if a resource exists Args: api_url(str): url of the resource. Returns: true if resource exists """response=self._h2c.head(api_url,timeout=REST_TIMEOUT)returnresponse.status_code==self.Codes.OK
[docs]defindex_clear(self,cache_name:str)->Response:"""Clear an index on a cache Args: cache_name(str): name of the cache. Returns: An http Response containing the result of the operation """api_url=(self._default_node+self._cache_url+"/"+cache_name+"/search/indexes?action=clear")returnself._h2c.post(api_url,timeout=REST_TIMEOUT)
[docs]defindex_reindex(self,cache_name:str)->Response:"""Rebuild index on a cache Args: cache_name(str): name of the cache. Returns: An http Response containing the result of the operation """api_url=(self._default_node+self._cache_url+"/"+cache_name+"/search/indexes?action=reindex")returnself._h2c.post(api_url,timeout=REST_TIMEOUT)