Source code for langchain_community.embeddings.infinity
"""written under MIT Licence, Michael Feil 2023."""importasynciofromconcurrent.futuresimportThreadPoolExecutorfromtypingimportAny,Callable,Dict,List,Optional,Tupleimportaiohttpimportnumpyasnpimportrequestsfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.utilsimportget_from_dict_or_envfrompydanticimportBaseModel,ConfigDict,model_validator__all__=["InfinityEmbeddings"]
[docs]classInfinityEmbeddings(BaseModel,Embeddings):"""Self-hosted embedding models for `infinity` package. See https://github.com/michaelfeil/infinity This also works for text-embeddings-inference and other self-hosted openai-compatible servers. Infinity is a package to interact with Embedding Models on https://github.com/michaelfeil/infinity Example: .. code-block:: python from langchain_community.embeddings import InfinityEmbeddings InfinityEmbeddings( model="BAAI/bge-small", infinity_api_url="http://localhost:7997", ) """model:str"Underlying Infinity model id."infinity_api_url:str="http://localhost:7997""""Endpoint URL to use."""client:Any=None#: :meta private:"""Infinity client."""# LLM call kwargsmodel_config=ConfigDict(extra="forbid",)@model_validator(mode="before")@classmethoddefvalidate_environment(cls,values:Dict)->Any:"""Validate that api key and python package exists in environment."""values["infinity_api_url"]=get_from_dict_or_env(values,"infinity_api_url","INFINITY_API_URL")values["client"]=TinyAsyncOpenAIInfinityEmbeddingClient(host=values["infinity_api_url"],)returnvalues
[docs]defembed_documents(self,texts:List[str])->List[List[float]]:"""Call out to Infinity's embedding endpoint. Args: texts: The list of texts to embed. Returns: List of embeddings, one for each text. """embeddings=self.client.embed(model=self.model,texts=texts,)returnembeddings
[docs]asyncdefaembed_documents(self,texts:List[str])->List[List[float]]:"""Async call out to Infinity's embedding endpoint. Args: texts: The list of texts to embed. Returns: List of embeddings, one for each text. """embeddings=awaitself.client.aembed(model=self.model,texts=texts,)returnembeddings
[docs]defembed_query(self,text:str)->List[float]:"""Call out to Infinity's embedding endpoint. Args: text: The text to embed. Returns: Embeddings for the text. """returnself.embed_documents([text])[0]
[docs]asyncdefaembed_query(self,text:str)->List[float]:"""Async call out to Infinity's embedding endpoint. Args: text: The text to embed. Returns: Embeddings for the text. """embeddings=awaitself.aembed_documents([text])returnembeddings[0]
[docs]classTinyAsyncOpenAIInfinityEmbeddingClient:#: :meta private:"""Helper tool to embed Infinity. It is not a part of Langchain's stable API, direct use discouraged. Example: .. code-block:: python mini_client = TinyAsyncInfinityEmbeddingClient( ) embeds = mini_client.embed( model="BAAI/bge-small", text=["doc1", "doc2"] ) # or embeds = await mini_client.aembed( model="BAAI/bge-small", text=["doc1", "doc2"] ) """
[docs]def__init__(self,host:str="http://localhost:7797/v1",aiosession:Optional[aiohttp.ClientSession]=None,)->None:self.host=hostself.aiosession=aiosessionifself.hostisNoneorlen(self.host)<3:raiseValueError(" param `host` must be set to a valid url")self._batch_size=128
@staticmethoddef_permute(texts:List[str],sorter:Callable=len)->Tuple[List[str],Callable]:"""Sort texts in ascending order, and delivers a lambda expr, which can sort a same length list https://github.com/UKPLab/sentence-transformers/blob/ c5f93f70eca933c78695c5bc686ceda59651ae3b/sentence_transformers/SentenceTransformer.py#L156 Args: texts (List[str]): _description_ sorter (Callable, optional): _description_. Defaults to len. Returns: Tuple[List[str], Callable]: _description_ Example: ``` texts = ["one","three","four"] perm_texts, undo = self._permute(texts) texts == undo(perm_texts) ``` """iflen(texts)==1:# special case queryreturntexts,lambdat:tlength_sorted_idx=np.argsort([-sorter(sen)forsenintexts])texts_sorted=[texts[idx]foridxinlength_sorted_idx]returntexts_sorted,lambdaunsorted_embeddings:[# E731unsorted_embeddings[idx]foridxinnp.argsort(length_sorted_idx)]def_batch(self,texts:List[str])->List[List[str]]:""" splits Lists of text parts into batches of size max `self._batch_size` When encoding vector database, Args: texts (List[str]): List of sentences self._batch_size (int, optional): max batch size of one request. Returns: List[List[str]]: Batches of List of sentences """iflen(texts)==1:# special case queryreturn[texts]batches=[]forstart_indexinrange(0,len(texts),self._batch_size):batches.append(texts[start_index:start_index+self._batch_size])returnbatches@staticmethoddef_unbatch(batch_of_texts:List[List[Any]])->List[Any]:iflen(batch_of_texts)==1andlen(batch_of_texts[0])==1:# special case queryreturnbatch_of_texts[0]texts=[]forsublistinbatch_of_texts:texts.extend(sublist)returntextsdef_kwargs_post_request(self,model:str,texts:List[str])->Dict[str,Any]:"""Build the kwargs for the Post request, used by sync Args: model (str): _description_ texts (List[str]): _description_ Returns: Dict[str, Collection[str]]: _description_ """returndict(url=f"{self.host}/embeddings",headers={# "accept": "application/json","content-type":"application/json",},json=dict(input=texts,model=model,),)def_sync_request_embed(self,model:str,batch_texts:List[str])->List[List[float]]:response=requests.post(**self._kwargs_post_request(model=model,texts=batch_texts))ifresponse.status_code!=200:raiseException(f"Infinity returned an unexpected response with status "f"{response.status_code}: {response.text}")return[e["embedding"]foreinresponse.json()["data"]]
[docs]defembed(self,model:str,texts:List[str])->List[List[float]]:"""call the embedding of model Args: model (str): to embedding model texts (List[str]): List of sentences to embed. Returns: List[List[float]]: List of vectors for each sentence """perm_texts,unpermute_func=self._permute(texts)perm_texts_batched=self._batch(perm_texts)# Requestmap_args=(self._sync_request_embed,[model]*len(perm_texts_batched),perm_texts_batched,)iflen(perm_texts_batched)==1:embeddings_batch_perm=list(map(*map_args))else:withThreadPoolExecutor(32)asp:embeddings_batch_perm=list(p.map(*map_args))embeddings_perm=self._unbatch(embeddings_batch_perm)embeddings=unpermute_func(embeddings_perm)returnembeddings
asyncdef_async_request(self,session:aiohttp.ClientSession,kwargs:Dict[str,Any])->List[List[float]]:asyncwithsession.post(**kwargs)asresponse:ifresponse.status!=200:raiseException(f"Infinity returned an unexpected response with status "f"{response.status}: {response.text}")embedding=(awaitresponse.json())["data"]return[e["embedding"]foreinembedding]
[docs]asyncdefaembed(self,model:str,texts:List[str])->List[List[float]]:"""call the embedding of model, async method Args: model (str): to embedding model texts (List[str]): List of sentences to embed. Returns: List[List[float]]: List of vectors for each sentence """perm_texts,unpermute_func=self._permute(texts)perm_texts_batched=self._batch(perm_texts)# Requestasyncwithaiohttp.ClientSession(trust_env=True,connector=aiohttp.TCPConnector(limit=32))assession:embeddings_batch_perm=awaitasyncio.gather(*[self._async_request(session=session,kwargs=self._kwargs_post_request(model=model,texts=t),)fortinperm_texts_batched])embeddings_perm=self._unbatch(embeddings_batch_perm)embeddings=unpermute_func(embeddings_perm)returnembeddings