Source code for langchain.chains.elasticsearch_database.base
"""Chain for interacting with Elasticsearch Database."""from__future__importannotationsfromtypingimportTYPE_CHECKING,Any,Dict,List,Optionalfromlangchain_core.callbacksimportCallbackManagerForChainRunfromlangchain_core.language_modelsimportBaseLanguageModelfromlangchain_core.output_parsersimportBaseOutputParser,StrOutputParserfromlangchain_core.output_parsers.jsonimportSimpleJsonOutputParserfromlangchain_core.promptsimportBasePromptTemplatefromlangchain_core.runnablesimportRunnablefrompydanticimportConfigDict,model_validatorfromtyping_extensionsimportSelffromlangchain.chains.baseimportChainfromlangchain.chains.elasticsearch_database.promptsimportANSWER_PROMPT,DSL_PROMPTifTYPE_CHECKING:fromelasticsearchimportElasticsearchINTERMEDIATE_STEPS_KEY="intermediate_steps"
[docs]classElasticsearchDatabaseChain(Chain):"""Chain for interacting with Elasticsearch Database. Example: .. code-block:: python from langchain.chains import ElasticsearchDatabaseChain from langchain_community.llms import OpenAI from elasticsearch import Elasticsearch database = Elasticsearch("http://localhost:9200") db_chain = ElasticsearchDatabaseChain.from_llm(OpenAI(), database) """query_chain:Runnable"""Chain for creating the ES query."""answer_chain:Runnable"""Chain for answering the user question."""database:Any=None"""Elasticsearch database to connect to of type elasticsearch.Elasticsearch."""top_k:int=10"""Number of results to return from the query"""ignore_indices:Optional[List[str]]=Noneinclude_indices:Optional[List[str]]=Noneinput_key:str="question"#: :meta private:output_key:str="result"#: :meta private:sample_documents_in_index_info:int=3return_intermediate_steps:bool=False"""Whether or not to return the intermediate steps along with the final answer."""model_config=ConfigDict(arbitrary_types_allowed=True,extra="forbid",)@model_validator(mode="after")defvalidate_indices(self)->Self:ifself.include_indicesandself.ignore_indices:raiseValueError("Cannot specify both 'include_indices' and 'ignore_indices'.")returnself@propertydefinput_keys(self)->List[str]:"""Return the singular input key. :meta private: """return[self.input_key]@propertydefoutput_keys(self)->List[str]:"""Return the singular output key. :meta private: """ifnotself.return_intermediate_steps:return[self.output_key]else:return[self.output_key,INTERMEDIATE_STEPS_KEY]def_list_indices(self)->List[str]:all_indices=[index["index"]forindexinself.database.cat.indices(format="json")]ifself.include_indices:all_indices=[iforiinall_indicesifiinself.include_indices]ifself.ignore_indices:all_indices=[iforiinall_indicesifinotinself.ignore_indices]returnall_indicesdef_get_indices_infos(self,indices:List[str])->str:mappings=self.database.indices.get_mapping(index=",".join(indices))ifself.sample_documents_in_index_info>0:fork,vinmappings.items():hits=self.database.search(index=k,query={"match_all":{}},size=self.sample_documents_in_index_info,)["hits"]["hits"]hits=[str(hit["_source"])forhitinhits]mappings[k]["mappings"]=str(v)+"\n\n/*\n"+"\n".join(hits)+"\n*/"return"\n\n".join(["Mapping for index {}:\n{}".format(index,mappings[index]["mappings"])forindexinmappings])def_search(self,indices:List[str],query:str)->str:result=self.database.search(index=",".join(indices),body=query)returnstr(result)def_call(self,inputs:Dict[str,Any],run_manager:Optional[CallbackManagerForChainRun]=None,)->Dict[str,Any]:_run_manager=run_managerorCallbackManagerForChainRun.get_noop_manager()input_text=f"{inputs[self.input_key]}\nESQuery:"_run_manager.on_text(input_text,verbose=self.verbose)indices=self._list_indices()indices_info=self._get_indices_infos(indices)query_inputs:dict={"input":input_text,"top_k":str(self.top_k),"indices_info":indices_info,"stop":["\nESResult:"],}intermediate_steps:List=[]try:intermediate_steps.append(query_inputs)# input: es generationes_cmd=self.query_chain.invoke(query_inputs,config={"callbacks":_run_manager.get_child()},)_run_manager.on_text(es_cmd,color="green",verbose=self.verbose)intermediate_steps.append(es_cmd)# output: elasticsearch dsl generation (no checker)intermediate_steps.append({"es_cmd":es_cmd})# input: ES searchresult=self._search(indices=indices,query=es_cmd)intermediate_steps.append(str(result))# output: ES search_run_manager.on_text("\nESResult: ",verbose=self.verbose)_run_manager.on_text(result,color="yellow",verbose=self.verbose)_run_manager.on_text("\nAnswer:",verbose=self.verbose)answer_inputs:dict={"data":result,"input":input_text}intermediate_steps.append(answer_inputs)# input: final answerfinal_result=self.answer_chain.invoke(answer_inputs,config={"callbacks":_run_manager.get_child()},)intermediate_steps.append(final_result)# output: final answer_run_manager.on_text(final_result,color="green",verbose=self.verbose)chain_result:Dict[str,Any]={self.output_key:final_result}ifself.return_intermediate_steps:chain_result[INTERMEDIATE_STEPS_KEY]=intermediate_stepsreturnchain_resultexceptExceptionasexc:# Append intermediate steps to exception, to aid in logging and later# improvement of few shot prompt seedsexc.intermediate_steps=intermediate_steps# type: ignoreraiseexc@propertydef_chain_type(self)->str:return"elasticsearch_database_chain"
[docs]@classmethoddeffrom_llm(cls,llm:BaseLanguageModel,database:Elasticsearch,*,query_prompt:Optional[BasePromptTemplate]=None,answer_prompt:Optional[BasePromptTemplate]=None,query_output_parser:Optional[BaseOutputParser]=None,**kwargs:Any,)->ElasticsearchDatabaseChain:"""Convenience method to construct ElasticsearchDatabaseChain from an LLM. Args: llm: The language model to use. database: The Elasticsearch db. query_prompt: The prompt to use for query construction. answer_prompt: The prompt to use for answering user question given data. query_output_parser: The output parser to use for parsing model-generated ES query. Defaults to SimpleJsonOutputParser. kwargs: Additional arguments to pass to the constructor. """query_prompt=query_promptorDSL_PROMPTquery_output_parser=query_output_parserorSimpleJsonOutputParser()query_chain=query_prompt|llm|query_output_parseranswer_prompt=answer_promptorANSWER_PROMPTanswer_chain=answer_prompt|llm|StrOutputParser()returncls(query_chain=query_chain,answer_chain=answer_chain,database=database,**kwargs,)