Source code for langchain_community.document_compressors.infinity_rerank
from__future__importannotationsfromcopyimportdeepcopyfromtypingimportTYPE_CHECKING,Any,Dict,List,Optional,Sequence,Unionfromlangchain.retrievers.document_compressors.baseimportBaseDocumentCompressorfromlangchain_core.callbacks.managerimportCallbacksfromlangchain_core.documentsimportDocumentfrompydanticimportConfigDict,model_validatorifTYPE_CHECKING:frominfinity_client.api.defaultimportrerankfrominfinity_client.clientimportClientfrominfinity_client.modelsimportRerankInputelse:# Avoid pydantic annotation issues when actually instantiating# while keeping this import optionaltry:frominfinity_client.api.defaultimportrerankfrominfinity_client.clientimportClientfrominfinity_client.modelsimportRerankInputexceptImportError:passDEFAULT_MODEL_NAME="BAAI/bge-reranker-base"DEFAULT_BASE_URL="http://localhost:7997"
[docs]classInfinityRerank(BaseDocumentCompressor):"""Document compressor that uses `Infinity Rerank API`."""client:Optional[Client]=None"""Infinity client to use for compressing documents."""model:Optional[str]=None"""Model to use for reranking."""top_n:Optional[int]=3"""Number of documents to return."""model_config=ConfigDict(populate_by_name=True,arbitrary_types_allowed=True,extra="forbid",)@model_validator(mode="before")@classmethoddefvalidate_environment(cls,values:Dict)->Any:"""Validate that python package exists in environment."""if"client"invalues:returnvalueselse:try:frominfinity_client.clientimportClientexceptImportError:raiseImportError("Could not import infinity_client python package. ""Please install it with `pip install infinity_client`.")values["model"]=values.get("model",DEFAULT_MODEL_NAME)values["client"]=Client(base_url=DEFAULT_BASE_URL)returnvalues
[docs]defrerank(self,documents:Sequence[Union[str,Document,dict]],query:str,*,model:Optional[str]=None,top_n:Optional[int]=-1,)->List[Dict[str,Any]]:"""Returns an ordered list of documents ordered by their relevance to the provided query. Args: query: The query to use for reranking. documents: A sequence of documents to rerank. model: The model to use for re-ranking. Default to self.model. top_n : The number of results to return. If None returns all results. Defaults to self.top_n. max_chunks_per_doc : The maximum number of chunks derived from a document. """# noqa: E501iflen(documents)==0:# to avoid empty api callreturn[]docs=[doc.page_contentifisinstance(doc,Document)elsedocfordocindocuments]model=modelorself.modelinput=RerankInput(query=query,documents=docs,model=model,)results=rerank.sync(client=self.client,body=input)ifhasattr(results,"results"):results=getattr(results,"results")result_dicts=[]forresinresults:result_dicts.append({"index":res.index,"relevance_score":res.relevance_score,})result_dicts.sort(key=lambdax:x["relevance_score"],reverse=True)top_n=top_nif(top_nisNoneortop_n>0)elseself.top_nreturnresult_dicts[:top_n]
[docs]defcompress_documents(self,documents:Sequence[Document],query:str,callbacks:Optional[Callbacks]=None,)->Sequence[Document]:""" Compress documents using Infinity's rerank API. Args: documents: A sequence of documents to compress. query: The query to use for compressing the documents. callbacks: Callbacks to run during the compression process. Returns: A sequence of compressed documents. """compressed=[]forresinself.rerank(documents,query):doc=documents[res["index"]]doc_copy=Document(doc.page_content,metadata=deepcopy(doc.metadata))doc_copy.metadata["relevance_score"]=res["relevance_score"]compressed.append(doc_copy)returncompressed