Source code for langchain_community.chains.pebblo_retrieval.base
"""Pebblo Retrieval Chain with Identity & Semantic Enforcement for question-answeringagainst a vector database."""importdatetimeimportinspectimportloggingfromimportlib.metadataimportversionfromtypingimportAny,Dict,List,Optionalfromlangchain.chains.baseimportChainfromlangchain.chains.combine_documents.baseimportBaseCombineDocumentsChainfromlangchain_core.callbacksimport(AsyncCallbackManagerForChainRun,CallbackManagerForChainRun,)fromlangchain_core.documentsimportDocumentfromlangchain_core.language_modelsimportBaseLanguageModelfromlangchain_core.vectorstoresimportVectorStoreRetrieverfrompydanticimportConfigDict,Field,validatorfromlangchain_community.chains.pebblo_retrieval.enforcement_filtersimport(SUPPORTED_VECTORSTORES,set_enforcement_filters,)fromlangchain_community.chains.pebblo_retrieval.modelsimport(App,AuthContext,ChainInfo,Framework,Model,SemanticContext,VectorDB,)fromlangchain_community.chains.pebblo_retrieval.utilitiesimport(PLUGIN_VERSION,PebbloRetrievalAPIWrapper,get_runtime,)logger=logging.getLogger(__name__)
[docs]classPebbloRetrievalQA(Chain):""" Retrieval Chain with Identity & Semantic Enforcement for question-answering against a vector database. """combine_documents_chain:BaseCombineDocumentsChain"""Chain to use to combine the documents."""input_key:str="query"#: :meta private:output_key:str="result"#: :meta private:return_source_documents:bool=False"""Return the source documents or not."""retriever:VectorStoreRetriever=Field(exclude=True)"""VectorStore to use for retrieval."""auth_context_key:str="auth_context"#: :meta private:"""Authentication context for identity enforcement."""semantic_context_key:str="semantic_context"#: :meta private:"""Semantic context for semantic enforcement."""app_name:str#: :meta private:"""App name."""owner:str#: :meta private:"""Owner of app."""description:str#: :meta private:"""Description of app."""api_key:Optional[str]=None#: :meta private:"""Pebblo cloud API key for app."""classifier_url:Optional[str]=None#: :meta private:"""Classifier endpoint."""classifier_location:str="local"#: :meta private:"""Classifier location. It could be either of 'local' or 'pebblo-cloud'."""_discover_sent:bool=False#: :meta private:"""Flag to check if discover payload has been sent."""enable_prompt_gov:bool=True#: :meta private:"""Flag to check if prompt governance is enabled or not"""pb_client:PebbloRetrievalAPIWrapper=Field(default_factory=PebbloRetrievalAPIWrapper)"""Pebblo Retrieval API client"""def_call(self,inputs:Dict[str,Any],run_manager:Optional[CallbackManagerForChainRun]=None,)->Dict[str,Any]:"""Run get_relevant_text and llm on input query. If chain has 'return_source_documents' as 'True', returns the retrieved documents as well under the key 'source_documents'. Example: .. code-block:: python res = indexqa({'query': 'This is my query'}) answer, docs = res['result'], res['source_documents'] """prompt_time=datetime.datetime.now().isoformat()_run_manager=run_managerorCallbackManagerForChainRun.get_noop_manager()question=inputs[self.input_key]auth_context=inputs.get(self.auth_context_key)semantic_context=inputs.get(self.semantic_context_key)_,prompt_entities=self.pb_client.check_prompt_validity(question)accepts_run_manager=("run_manager"ininspect.signature(self._get_docs).parameters)ifaccepts_run_manager:docs=self._get_docs(question,auth_context,semantic_context,run_manager=_run_manager)else:docs=self._get_docs(question,auth_context,semantic_context)# type: ignore[call-arg]answer=self.combine_documents_chain.run(input_documents=docs,question=question,callbacks=_run_manager.get_child())self.pb_client.send_prompt(self.app_name,self.retriever,question,answer,auth_context,docs,prompt_entities,prompt_time,self.enable_prompt_gov,)ifself.return_source_documents:return{self.output_key:answer,"source_documents":docs}else:return{self.output_key:answer}asyncdef_acall(self,inputs:Dict[str,Any],run_manager:Optional[AsyncCallbackManagerForChainRun]=None,)->Dict[str,Any]:"""Run get_relevant_text and llm on input query. If chain has 'return_source_documents' as 'True', returns the retrieved documents as well under the key 'source_documents'. Example: .. code-block:: python res = indexqa({'query': 'This is my query'}) answer, docs = res['result'], res['source_documents'] """prompt_time=datetime.datetime.now().isoformat()_run_manager=run_managerorAsyncCallbackManagerForChainRun.get_noop_manager()question=inputs[self.input_key]auth_context=inputs.get(self.auth_context_key)semantic_context=inputs.get(self.semantic_context_key)accepts_run_manager=("run_manager"ininspect.signature(self._aget_docs).parameters)_,prompt_entities=awaitself.pb_client.acheck_prompt_validity(question)ifaccepts_run_manager:docs=awaitself._aget_docs(question,auth_context,semantic_context,run_manager=_run_manager)else:docs=awaitself._aget_docs(question,auth_context,semantic_context)# type: ignore[call-arg]answer=awaitself.combine_documents_chain.arun(input_documents=docs,question=question,callbacks=_run_manager.get_child())awaitself.pb_client.asend_prompt(self.app_name,self.retriever,question,answer,auth_context,docs,prompt_entities,prompt_time,self.enable_prompt_gov,)ifself.return_source_documents:return{self.output_key:answer,"source_documents":docs}else:return{self.output_key:answer}model_config=ConfigDict(populate_by_name=True,arbitrary_types_allowed=True,extra="forbid",)@propertydefinput_keys(self)->List[str]:"""Input keys. :meta private: """return[self.input_key,self.auth_context_key,self.semantic_context_key]@propertydefoutput_keys(self)->List[str]:"""Output keys. :meta private: """_output_keys=[self.output_key]ifself.return_source_documents:_output_keys+=["source_documents"]return_output_keys@propertydef_chain_type(self)->str:"""Return the chain type."""return"pebblo_retrieval_qa"
[docs]@classmethoddeffrom_chain_type(cls,llm:BaseLanguageModel,app_name:str,description:str,owner:str,chain_type:str="stuff",chain_type_kwargs:Optional[dict]=None,api_key:Optional[str]=None,classifier_url:Optional[str]=None,classifier_location:str="local",**kwargs:Any,)->"PebbloRetrievalQA":"""Load chain from chain type."""fromlangchain.chains.question_answeringimportload_qa_chain_chain_type_kwargs=chain_type_kwargsor{}combine_documents_chain=load_qa_chain(llm,chain_type=chain_type,**_chain_type_kwargs)# generate appapp:App=PebbloRetrievalQA._get_app_details(app_name=app_name,description=description,owner=owner,llm=llm,**kwargs,)# initialize Pebblo API clientpb_client=PebbloRetrievalAPIWrapper(api_key=api_key,classifier_location=classifier_location,classifier_url=classifier_url,)# send app discovery requestpb_client.send_app_discover(app)returncls(combine_documents_chain=combine_documents_chain,app_name=app_name,owner=owner,description=description,api_key=api_key,classifier_url=classifier_url,classifier_location=classifier_location,pb_client=pb_client,**kwargs,)
[docs]@validator("retriever",pre=True,always=True)defvalidate_vectorstore(cls,retriever:VectorStoreRetriever)->VectorStoreRetriever:""" Validate that the vectorstore of the retriever is supported vectorstores. """ifretriever.vectorstore.__class__.__name__notinSUPPORTED_VECTORSTORES:raiseValueError(f"Vectorstore must be an instance of one of the supported "f"vectorstores: {SUPPORTED_VECTORSTORES}. "f"Got '{retriever.vectorstore.__class__.__name__}' instead.")returnretriever