[docs]defis_endpoint_live(url:str,headers:Optional[dict],payload:Any)->bool:""" Check if an endpoint is live by sending a GET request to the specified URL. Args: url (str): The URL of the endpoint to check. Returns: bool: True if the endpoint is live (status code 200), False otherwise. Raises: Exception: If the endpoint returns a non-successful status code or if there is an error querying the endpoint. """try:response=requests.request("POST",url,headers=headers,data=payload)# Check if the status code is 200 (OK)ifresponse.status_code==200:returnTrueelse:# Raise an exception if the status code is not 200raiseException(f"Endpoint returned a non-successful status code: "f"{response.status_code}")exceptrequests.exceptions.RequestExceptionase:# Handle any exceptions (e.g., connection errors)raiseException(f"Error querying the endpoint: {e}")
[docs]@deprecated(since="0.0.37",removal="1.0.0",message=("Directly instantiating a NeMoEmbeddings from langchain-community is ""deprecated. Please use langchain-nvidia-ai-endpoints NVIDIAEmbeddings ""interface."),)classNeMoEmbeddings(BaseModel,Embeddings):"""NeMo embedding models."""batch_size:int=16model:str="NV-Embed-QA-003"api_endpoint_url:str="http://localhost:8088/v1/embeddings"
[docs]@pre_initdefvalidate_environment(cls,values:Dict)->Dict:"""Validate that the end point is alive using the values that are provided."""url=values["api_endpoint_url"]model=values["model"]# Optional: A minimal test payload and headers required by the endpointheaders={"Content-Type":"application/json"}payload=json.dumps({"input":"Hello World","model":model,"input_type":"query",})is_endpoint_live(url,headers,payload)returnvalues
asyncdef_aembedding_func(self,session:Any,text:str,input_type:str)->List[float]:"""Async call out to embedding endpoint. Args: text: The text to embed. Returns: Embeddings for the text. """headers={"Content-Type":"application/json"}asyncwithsession.post(self.api_endpoint_url,json={"input":text,"model":self.model,"input_type":input_type},headers=headers,)asresponse:response.raise_for_status()answer=awaitresponse.text()answer=json.loads(answer)returnanswer["data"][0]["embedding"]def_embedding_func(self,text:str,input_type:str)->List[float]:"""Call out to Cohere's embedding endpoint. Args: text: The text to embed. Returns: Embeddings for the text. """payload=json.dumps({"input":text,"model":self.model,"input_type":input_type,})headers={"Content-Type":"application/json"}response=requests.request("POST",self.api_endpoint_url,headers=headers,data=payload)response_json=json.loads(response.text)embedding=response_json["data"][0]["embedding"]returnembedding
[docs]defembed_documents(self,documents:List[str])->List[List[float]]:"""Embed a list of document texts. Args: texts: The list of texts to embed. Returns: List of embeddings, one for each text. """return[self._embedding_func(text,input_type="passage")fortextindocuments]
[docs]asyncdefaembed_query(self,text:str)->List[float]:"""Call out to NeMo's embedding endpoint async for embedding query text. Args: text: The text to embed. Returns: Embedding for the text. """asyncwithaiohttp.ClientSession()assession:embedding=awaitself._aembedding_func(session,text,"passage")returnembedding
[docs]asyncdefaembed_documents(self,texts:List[str])->List[List[float]]:"""Call out to NeMo's embedding endpoint async for embedding search docs. Args: texts: The list of texts to embed. Returns: List of embeddings, one for each text. """embeddings=[]asyncwithaiohttp.ClientSession()assession:forbatchinrange(0,len(texts),self.batch_size):text_batch=texts[batch:batch+self.batch_size]fortextintext_batch:# Create tasks for all texts in the batchtasks=[self._aembedding_func(session,text,"passage")fortextintext_batch]# Run all tasks concurrentlybatch_results=awaitasyncio.gather(*tasks)# Extend the embeddings list with results from this batchembeddings.extend(batch_results)returnembeddings