[docs]classContextualCompressionRetriever(BaseRetriever):"""Retriever that wraps a base retriever and compresses the results."""base_compressor:BaseDocumentCompressor"""Compressor for compressing retrieved documents."""base_retriever:RetrieverLike"""Base Retriever to use for getting relevant documents."""model_config=ConfigDict(arbitrary_types_allowed=True,)def_get_relevant_documents(self,query:str,*,run_manager:CallbackManagerForRetrieverRun,**kwargs:Any,)->List[Document]:"""Get documents relevant for a query. Args: query: string to find relevant documents for Returns: Sequence of relevant documents """docs=self.base_retriever.invoke(query,config={"callbacks":run_manager.get_child()},**kwargs)ifdocs:compressed_docs=self.base_compressor.compress_documents(docs,query,callbacks=run_manager.get_child())returnlist(compressed_docs)else:return[]asyncdef_aget_relevant_documents(self,query:str,*,run_manager:AsyncCallbackManagerForRetrieverRun,**kwargs:Any,)->List[Document]:"""Get documents relevant for a query. Args: query: string to find relevant documents for Returns: List of relevant documents """docs=awaitself.base_retriever.ainvoke(query,config={"callbacks":run_manager.get_child()},**kwargs)ifdocs:compressed_docs=awaitself.base_compressor.acompress_documents(docs,query,callbacks=run_manager.get_child())returnlist(compressed_docs)else:return[]