Source code for langchain_community.embeddings.infinity_local
"""written under MIT Licence, Michael Feil 2023."""importasynciofromloggingimportgetLoggerfromtypingimportAny,List,Optionalfromlangchain_core.embeddingsimportEmbeddingsfrompydanticimportBaseModel,ConfigDict,model_validatorfromtyping_extensionsimportSelf__all__=["InfinityEmbeddingsLocal"]logger=getLogger(__name__)
[docs]classInfinityEmbeddingsLocal(BaseModel,Embeddings):"""Optimized Infinity embedding models. https://github.com/michaelfeil/infinity This class deploys a local Infinity instance to embed text. The class requires async usage. Infinity is a class to interact with Embedding Models on https://github.com/michaelfeil/infinity Example: .. code-block:: python from langchain_community.embeddings import InfinityEmbeddingsLocal async with InfinityEmbeddingsLocal( model="BAAI/bge-small-en-v1.5", revision=None, device="cpu", ) as embedder: embeddings = await engine.aembed_documents(["text1", "text2"]) """model:str"Underlying model id from huggingface, e.g. BAAI/bge-small-en-v1.5"revision:Optional[str]=None"Model version, the commit hash from huggingface"batch_size:int=32"Internal batch size for inference, e.g. 32"device:str="auto""Device to use for inference, e.g. 'cpu' or 'cuda', or 'mps'"backend:str="torch""Backend for inference, e.g. 'torch' (recommended for ROCm/Nvidia)"" or 'optimum' for onnx/tensorrt"model_warmup:bool=True"Warmup the model with the max batch size."engine:Any=None#: :meta private:"""Infinity's AsyncEmbeddingEngine."""# LLM call kwargsmodel_config=ConfigDict(extra="forbid",protected_namespaces=(),)@model_validator(mode="after")defvalidate_environment(self)->Self:"""Validate that api key and python package exists in environment."""try:frominfinity_embimportAsyncEmbeddingEngine# type: ignoreexceptImportError:raiseImportError("Please install the ""`pip install 'infinity_emb[optimum,torch]>=0.0.24'` ""package to use the InfinityEmbeddingsLocal.")self.engine=AsyncEmbeddingEngine(model_name_or_path=self.model,device=self.device,revision=self.revision,model_warmup=self.model_warmup,batch_size=self.batch_size,engine=self.backend,)returnselfasyncdef__aenter__(self)->None:"""start the background worker. recommended usage is with the async with statement. async with InfinityEmbeddingsLocal( model="BAAI/bge-small-en-v1.5", revision=None, device="cpu", ) as embedder: embeddings = await engine.aembed_documents(["text1", "text2"]) """awaitself.engine.__aenter__()asyncdef__aexit__(self,*args:Any)->None:"""stop the background worker, required to free references to the pytorch model."""awaitself.engine.__aexit__(*args)
[docs]asyncdefaembed_documents(self,texts:List[str])->List[List[float]]:"""Async call out to Infinity's embedding endpoint. Args: texts: The list of texts to embed. Returns: List of embeddings, one for each text. """ifnotself.engine.running:logger.warning("Starting Infinity engine on the fly. This is not recommended.""Please start the engine before using it.")asyncwithself:# spawning threadpool for multithreaded encode, tokenizationembeddings,_=awaitself.engine.embed(texts)# stopping threadpool on exitlogger.warning("Stopped infinity engine after usage.")else:embeddings,_=awaitself.engine.embed(texts)returnembeddings
[docs]asyncdefaembed_query(self,text:str)->List[float]:"""Async call out to Infinity's embedding endpoint. Args: text: The text to embed. Returns: Embeddings for the text. """embeddings=awaitself.aembed_documents([text])returnembeddings[0]
[docs]defembed_documents(self,texts:List[str])->List[List[float]]:""" This method is async only. """logger.warning("This method is async only. ""Please use the async version `await aembed_documents`.")returnasyncio.run(self.aembed_documents(texts))
[docs]defembed_query(self,text:str)->List[float]:""" """logger.warning("This method is async only."" Please use the async version `await aembed_query`.")returnasyncio.run(self.aembed_query(text))