Source code for langchain_community.embeddings.infinity_local
"""written under MIT Licence, Michael Feil 2023."""importasynciofromloggingimportgetLoggerfromtypingimportAny,Dict,List,Optionalfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.pydantic_v1importBaseModel,root_validator__all__=["InfinityEmbeddingsLocal"]logger=getLogger(__name__)
[docs]classInfinityEmbeddingsLocal(BaseModel,Embeddings):"""Optimized Infinity embedding models. https://github.com/michaelfeil/infinity This class deploys a local Infinity instance to embed text. The class requires async usage. Infinity is a class to interact with Embedding Models on https://github.com/michaelfeil/infinity Example: .. code-block:: python from langchain_community.embeddings import InfinityEmbeddingsLocal async with InfinityEmbeddingsLocal( model="BAAI/bge-small-en-v1.5", revision=None, device="cpu", ) as embedder: embeddings = await engine.aembed_documents(["text1", "text2"]) """model:str"Underlying model id from huggingface, e.g. BAAI/bge-small-en-v1.5"revision:Optional[str]=None"Model version, the commit hash from huggingface"batch_size:int=32"Internal batch size for inference, e.g. 32"device:str="auto""Device to use for inference, e.g. 'cpu' or 'cuda', or 'mps'"backend:str="torch""Backend for inference, e.g. 'torch' (recommended for ROCm/Nvidia)"" or 'optimum' for onnx/tensorrt"model_warmup:bool=True"Warmup the model with the max batch size."engine:Any=None#: :meta private:"""Infinity's AsyncEmbeddingEngine."""# LLM call kwargsclassConfig:extra="forbid"@root_validator(pre=False,skip_on_failure=True)defvalidate_environment(cls,values:Dict)->Dict:"""Validate that api key and python package exists in environment."""try:frominfinity_embimportAsyncEmbeddingEngine# type: ignoreexceptImportError:raiseImportError("Please install the ""`pip install 'infinity_emb[optimum,torch]>=0.0.24'` ""package to use the InfinityEmbeddingsLocal.")logger.debug(f"Using InfinityEmbeddingsLocal with kwargs {values}")values["engine"]=AsyncEmbeddingEngine(model_name_or_path=values["model"],device=values["device"],revision=values["revision"],model_warmup=values["model_warmup"],batch_size=values["batch_size"],engine=values["backend"],)returnvaluesasyncdef__aenter__(self)->None:"""start the background worker. recommended usage is with the async with statement. async with InfinityEmbeddingsLocal( model="BAAI/bge-small-en-v1.5", revision=None, device="cpu", ) as embedder: embeddings = await engine.aembed_documents(["text1", "text2"]) """awaitself.engine.__aenter__()asyncdef__aexit__(self,*args:Any)->None:"""stop the background worker, required to free references to the pytorch model."""awaitself.engine.__aexit__(*args)
[docs]asyncdefaembed_documents(self,texts:List[str])->List[List[float]]:"""Async call out to Infinity's embedding endpoint. Args: texts: The list of texts to embed. Returns: List of embeddings, one for each text. """ifnotself.engine.running:logger.warning("Starting Infinity engine on the fly. This is not recommended.""Please start the engine before using it.")asyncwithself:# spawning threadpool for multithreaded encode, tokenizationembeddings,_=awaitself.engine.embed(texts)# stopping threadpool on exitlogger.warning("Stopped infinity engine after usage.")else:embeddings,_=awaitself.engine.embed(texts)returnembeddings
[docs]asyncdefaembed_query(self,text:str)->List[float]:"""Async call out to Infinity's embedding endpoint. Args: text: The text to embed. Returns: Embeddings for the text. """embeddings=awaitself.aembed_documents([text])returnembeddings[0]
[docs]defembed_documents(self,texts:List[str])->List[List[float]]:""" This method is async only. """logger.warning("This method is async only. ""Please use the async version `await aembed_documents`.")returnasyncio.run(self.aembed_documents(texts))
[docs]defembed_query(self,text:str)->List[float]:""" """logger.warning("This method is async only."" Please use the async version `await aembed_query`.")returnasyncio.run(self.aembed_query(text))