[docs]classPineconeRerank(BaseDocumentCompressor):"""Document compressor that uses `Pinecone Rerank API`."""client:Optional[Pinecone]=None"""Pinecone client to use for compressing documents."""async_client:Optional[PineconeAsyncio]=None"""Pinecone client to use for compressing documents."""top_n:Optional[int]=3"""Number of documents to return."""model:str=Field(default="bge-reranker-v2-m3",description="Model to use for reranking. Default is 'bge-reranker-v2-m3'.",)"""Model to use for reranking."""pinecone_api_key:Optional[SecretStr]=Field(default_factory=secret_from_env("PINECONE_API_KEY",default=None))"""Pinecone API key. Must be specified directly or via environment variable PINECONE_API_KEY."""rank_fields:Optional[List[str]]=None"""Fields to use for reranking when documents are dictionaries."""return_documents:bool=True"""Whether to return the documents in the reranking results."""model_config=ConfigDict(extra="forbid",arbitrary_types_allowed=True,)def_get_api_key(self)->Optional[str]:"""Get the API key from SecretStr or directly."""ifisinstance(self.pinecone_api_key,SecretStr):returnself.pinecone_api_key.get_secret_value()returnself.pinecone_api_keydef_get_sync_client(self)->Pinecone:"""Get or create the sync client."""ifself.clientisNone:self.client=Pinecone(api_key=self._get_api_key())elifnotisinstance(self.client,Pinecone):raiseTypeError("The 'client' parameter must be an instance of Pinecone.\n""You may create the Pinecone object like:\n\n""from pinecone import Pinecone\nclient = Pinecone(api_key=...)")returnself.clientasyncdef_get_async_client(self)->PineconeAsyncio:"""Get or create the async client."""ifself.async_clientisNone:self.async_client=PineconeAsyncio(api_key=self._get_api_key())elifnotisinstance(self.async_client,PineconeAsyncio):raiseTypeError("The 'async_client' parameter must be an instance of PineconeAsyncio.\n""You may create the PineconeAsyncio object like:\n\n""from pinecone import PineconeAsyncio\nasync_client = PineconeAsyncio(api_key=...)")returnself.async_clientdef_document_to_dict(self,document:Union[str,Document,dict],index:int,)->dict:ifisinstance(document,Document):doc_id_from_meta=document.metadata.get("id")ifisinstance(doc_id_from_meta,str)anddoc_id_from_meta:doc_id=doc_id_from_metaelse:# Generate ID if not validdoc_id=f"doc_{index}"doc_data={"id":doc_id,"text":document.page_content,**document.metadata,}returndoc_dataelifisinstance(document,dict):current_id=document.get("id")ifnotisinstance(current_id,str)ornotcurrent_id:document["id"]=f"doc_{index}"# Generate and set ID if not validreturndocumentelse:return{"id":f"doc_{index}","text":str(document)}def_rerank_params(self,model:str,truncate:str)->dict:"""Returns the parameters for the rerank API call."""parameters={}# Only include truncate parameter for models that support itifmodel!="cohere-rerank-3.5":parameters["truncate"]=truncatereturnparameters
[docs]defrerank(self,documents:Sequence[Union[str,Document,dict]],query:str,*,rank_fields:Optional[List[str]]=None,model:Optional[str]=None,top_n:Optional[int]=None,truncate:str="END",)->List[Dict[str,Any]]:"""Returns an ordered list of documents ordered by their relevance to the provided query."""iflen(documents)==0:# to avoid empty API callreturn[]# Convert documents to dict formatdocs=[self._document_to_dict(document=doc,index=i)fori,docinenumerate(documents)]try:client=self._get_sync_client()# Use self.model if model is Nonemodel_to_use=modelifmodelisnotNoneelseself.modelifmodel_to_useisNone:# This should never happen due to validatorraiseValueError("No model specified for reranking")rerank_result=client.inference.rerank(model=model_to_use,query=query,documents=docs,rank_fields=rank_fieldsorself.rank_fieldsor["text"],top_n=top_norself.top_n,return_documents=self.return_documents,parameters=self._rerank_params(model=model_to_use,truncate=truncate),)result_dicts=[]forresult_item_datainrerank_result.data:result_dict={"id":result_item_data.document.id,"index":result_item_data.index,"score":result_item_data.score,}ifself.return_documents:result_dict["document"]=result_item_data.document.to_dict()result_dicts.append(result_dict)returnresult_dictsexceptExceptionase:logger.error(f"Rerank error: {e}")return[]
[docs]asyncdefarerank(self,documents:Sequence[Union[str,Document,dict]],query:str,*,rank_fields:Optional[List[str]]=None,model:Optional[str]=None,top_n:Optional[int]=None,truncate:str="END",)->List[Dict[str,Any]]:"""Async rerank documents using Pinecone's reranking API."""iflen(documents)==0:# to avoid empty API callreturn[]docs=[self._document_to_dict(document=doc,index=i)fori,docinenumerate(documents)]try:client=awaitself._get_async_client()# Use self.model if model is Nonemodel_to_use=modelifmodelisnotNoneelseself.modelifmodel_to_useisNone:# This should never happen due to validatorraiseValueError("No model specified for reranking")rerank_result=awaitclient.inference.rerank(model=model_to_use,query=query,documents=docs,rank_fields=rank_fieldsorself.rank_fieldsor["text"],top_n=top_norself.top_n,return_documents=self.return_documents,parameters=self._rerank_params(model=model_to_use,truncate=truncate),)result_dicts=[]forresult_item_datainrerank_result.data:result_dict={"id":result_item_data.document.id,"index":result_item_data.index,"score":result_item_data.score,}ifself.return_documents:result_dict["document"]=result_item_data.document.to_dict()result_dicts.append(result_dict)returnresult_dictsexceptExceptionase:logger.error(f"Async rerank error: {e}")return[]
[docs]defcompress_documents(self,documents:Sequence[Document],query:str,callbacks:Optional[Callbacks]=None,)->Sequence[Document]:"""Compress documents using Pinecone's rerank API."""ifnotdocuments:return[]compressed=[]reranked_results=self.rerank(documents=documents,query=query)ifnotreranked_results:return[]forresinreranked_results:ifres["index"]isnotNone:doc_index=res["index"]if0<=doc_index<len(documents):doc=documents[doc_index]doc_copy=Document(doc.page_content,metadata=deepcopy(doc.metadata))doc_copy.metadata["relevance_score"]=res["score"]compressed.append(doc_copy)returncompressed
[docs]asyncdefacompress_documents(self,documents:Sequence[Document],query:str,callbacks:Optional[Callbacks]=None,)->Sequence[Document]:"""Async compress documents using Pinecone's rerank API."""ifnotdocuments:return[]compressed=[]reranked_results=awaitself.arerank(documents=documents,query=query)ifnotreranked_results:return[]forresinreranked_results:ifres["index"]isnotNone:doc_index=res["index"]if0<=doc_index<len(documents):doc=documents[doc_index]doc_copy=Document(doc.page_content,metadata=deepcopy(doc.metadata))doc_copy.metadata["relevance_score"]=res["score"]compressed.append(doc_copy)returncompressed