[docs]classNVIDIARerank(BaseDocumentCompressor):""" LangChain Document Compressor that uses the NVIDIA NeMo Retriever Reranking API. """classConfig:validate_assignment=True_client:_NVIDIAClient=PrivateAttr(_NVIDIAClient)_default_batch_size:int=32_default_model_name:str="nvidia/nv-rerankqa-mistral-4b-v3"_default_base_url:str="https://integrate.api.nvidia.com/v1"base_url:str=Field(description="Base url for model listing an invocation",)top_n:int=Field(5,ge=0,description="The number of documents to return.")model:Optional[str]=Field(description="The model to use for reranking.")truncate:Optional[Literal["NONE","END"]]=Field(description=("Truncate input text if it exceeds the model's maximum token length. ""Default is model dependent and is likely to raise error if an ""input is too long."),)max_batch_size:int=Field(_default_batch_size,ge=1,description="The maximum batch size.")_base_url_var="NVIDIA_BASE_URL"@root_validator(pre=True)def_validate_base_url(cls,values:Dict[str,Any])->Dict[str,Any]:values["base_url"]=(values.get(cls._base_url_var.lower())orvalues.get("base_url")oros.getenv(cls._base_url_var)orcls._default_base_url)returnvaluesdef__init__(self,**kwargs:Any):""" Create a new NVIDIARerank document compressor. This class provides access to a NVIDIA NIM for reranking. By default, it connects to a hosted NIM, but can be configured to connect to a local NIM using the `base_url` parameter. An API key is required to connect to the hosted NIM. Args: model (str): The model to use for reranking. nvidia_api_key (str): The API key to use for connecting to the hosted NIM. api_key (str): Alternative to nvidia_api_key. base_url (str): The base URL of the NIM to connect to. truncate (str): "NONE", "END", truncate input text if it exceeds the model's context length. Default is model dependent and is likely to raise an error if an input is too long. API Key: - The recommended way to provide the API key is through the `NVIDIA_API_KEY` environment variable. Base URL: - Connect to a self-hosted model with NVIDIA NIM using the `base_url` arg to link to the local host at localhost:8000: ranker = NVIDIARerank(base_url="http://localhost:8000/v1") Example: >>> from langchain_nvidia_ai_endpoints import NVIDIARerank >>> from langchain_core.documents import Document >>> query = "What is the GPU memory bandwidth of H100 SXM?" >>> passages = [ "The Hopper GPU is paired with the Grace CPU using NVIDIA's ultra-fast chip-to-chip interconnect, delivering 900GB/s of bandwidth, 7X faster than PCIe Gen5. This innovative design will deliver up to 30X higher aggregate system memory bandwidth to the GPU compared to today's fastest servers and up to 10X higher performance for applications running terabytes of data.", "A100 provides up to 20X higher performance over the prior generation and can be partitioned into seven GPU instances to dynamically adjust to shifting demands. The A100 80GB debuts the world's fastest memory bandwidth at over 2 terabytes per second (TB/s) to run the largest models and datasets.", "Accelerated servers with H100 deliver the compute power—along with 3 terabytes per second (TB/s) of memory bandwidth per GPU and scalability with NVLink and NVSwitch™.", ] >>> client = NVIDIARerank( model="nvidia/nv-rerankqa-mistral-4b-v3", api_key="$API_KEY_REQUIRED_IF_EXECUTING_OUTSIDE_NGC" ) >>> response = client.compress_documents( query=query, documents=[Document(page_content=passage) for passage in passages] ) >>> print(f"Most relevant: {response[0].page_content}\n" f"Least relevant: {response[-1].page_content}" ) Most relevant: Accelerated servers with H100 deliver the compute power—along with 3 terabytes per second (TB/s) of memory bandwidth per GPU and scalability with NVLink and NVSwitch™. Least relevant: A100 provides up to 20X higher performance over the prior generation and can be partitioned into seven GPU instances to dynamically adjust to shifting demands. The A100 80GB debuts the world's fastest memory bandwidth at over 2 terabytes per second (TB/s) to run the largest models and datasets. """super().__init__(**kwargs)self._client=_NVIDIAClient(base_url=self.base_url,model_name=self.model,default_hosted_model_name=self._default_model_name,api_key=kwargs.get("nvidia_api_key",kwargs.get("api_key",None)),infer_path="{base_url}/ranking",cls=self.__class__.__name__,)# todo: only store the model in one place# the model may be updated to a newer name during initializationself.model=self._client.model_name@propertydefavailable_models(self)->List[Model]:""" Get a list of available models that work with NVIDIARerank. """returnself._client.get_available_models(self.__class__.__name__)
[docs]@classmethoddefget_available_models(cls,**kwargs:Any,)->List[Model]:""" Get a list of available models that work with NVIDIARerank. """returncls(**kwargs).available_models
# todo: batching when len(documents) > endpoint's max batch sizedef_rank(self,documents:List[str],query:str)->List[Ranking]:payload={"model":self.model,"query":{"text":query},"passages":[{"text":passage}forpassageindocuments],}ifself.truncate:payload["truncate"]=self.truncateresponse=self._client.get_req(payload=payload)ifresponse.status_code!=200:response.raise_for_status()# todo: handle errorsrankings=response.json()["rankings"]# todo: callback supportreturn[Ranking(**ranking)forrankinginrankings[:self.top_n]]
[docs]defcompress_documents(self,documents:Sequence[Document],query:str,callbacks:Optional[Callbacks]=None,)->Sequence[Document]:""" Compress documents using the NVIDIA NeMo Retriever Reranking microservice API. Args: documents: A sequence of documents to compress. query: The query to use for compressing the documents. callbacks: Callbacks to run during the compression process. Returns: A sequence of compressed documents. """iflen(documents)==0orself.top_n<1:return[]defbatch(ls:list,size:int)->Generator[List[Document],None,None]:foriinrange(0,len(ls),size):yieldls[i:i+size]doc_list=list(documents)results=[]fordoc_batchinbatch(doc_list,self.max_batch_size):rankings=self._rank(query=query,documents=[d.page_contentfordindoc_batch])forrankinginrankings:assert(0<=ranking.index<len(doc_batch)),"invalid response from server: index out of range"doc=doc_batch[ranking.index]doc.metadata["relevance_score"]=ranking.logitresults.append(doc)# if we batched, we need to sort the resultsiflen(doc_list)>self.max_batch_size:results.sort(key=lambdax:x.metadata["relevance_score"],reverse=True)returnresults[:self.top_n]