"""Chain for interacting with Elasticsearch Database."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from langchain_core.callbacks import CallbackManagerForChainRun
from langchain_core.language_models import BaseLanguageModel
from langchain_core.output_parsers import BaseOutputParser, StrOutputParser
from langchain_core.output_parsers.json import SimpleJsonOutputParser
from langchain_core.prompts import BasePromptTemplate
from langchain_core.runnables import Runnable
from pydantic import ConfigDict, model_validator
from typing_extensions import Self
from langchain.chains.base import Chain
from langchain.chains.elasticsearch_database.prompts import ANSWER_PROMPT, DSL_PROMPT
if TYPE_CHECKING:
from elasticsearch import Elasticsearch
INTERMEDIATE_STEPS_KEY = "intermediate_steps"
[docs]
class ElasticsearchDatabaseChain(Chain):
"""Chain for interacting with Elasticsearch Database.
Example:
.. code-block:: python
from langchain.chains import ElasticsearchDatabaseChain
from langchain_community.llms import OpenAI
from elasticsearch import Elasticsearch
database = Elasticsearch("http://localhost:9200")
db_chain = ElasticsearchDatabaseChain.from_llm(OpenAI(), database)
"""
query_chain: Runnable
"""Chain for creating the ES query."""
answer_chain: Runnable
"""Chain for answering the user question."""
database: Any = None
"""Elasticsearch database to connect to of type elasticsearch.Elasticsearch."""
top_k: int = 10
"""Number of results to return from the query"""
ignore_indices: Optional[List[str]] = None
include_indices: Optional[List[str]] = None
input_key: str = "question" #: :meta private:
output_key: str = "result" #: :meta private:
sample_documents_in_index_info: int = 3
return_intermediate_steps: bool = False
"""Whether or not to return the intermediate steps along with the final answer."""
model_config = ConfigDict(
arbitrary_types_allowed=True,
extra="forbid",
)
@model_validator(mode="after")
def validate_indices(self) -> Self:
if self.include_indices and self.ignore_indices:
raise ValueError(
"Cannot specify both 'include_indices' and 'ignore_indices'."
)
return self
@property
def input_keys(self) -> List[str]:
"""Return the singular input key.
:meta private:
"""
return [self.input_key]
@property
def output_keys(self) -> List[str]:
"""Return the singular output key.
:meta private:
"""
if not self.return_intermediate_steps:
return [self.output_key]
else:
return [self.output_key, INTERMEDIATE_STEPS_KEY]
def _list_indices(self) -> List[str]:
all_indices = [
index["index"] for index in self.database.cat.indices(format="json")
]
if self.include_indices:
all_indices = [i for i in all_indices if i in self.include_indices]
if self.ignore_indices:
all_indices = [i for i in all_indices if i not in self.ignore_indices]
return all_indices
def _get_indices_infos(self, indices: List[str]) -> str:
mappings = self.database.indices.get_mapping(index=",".join(indices))
if self.sample_documents_in_index_info > 0:
for k, v in mappings.items():
hits = self.database.search(
index=k,
query={"match_all": {}},
size=self.sample_documents_in_index_info,
)["hits"]["hits"]
hits = [str(hit["_source"]) for hit in hits]
mappings[k]["mappings"] = str(v) + "\n\n/*\n" + "\n".join(hits) + "\n*/"
return "\n\n".join(
[
"Mapping for index {}:\n{}".format(index, mappings[index]["mappings"])
for index in mappings
]
)
def _search(self, indices: List[str], query: str) -> str:
result = self.database.search(index=",".join(indices), body=query)
return str(result)
def _call(
self,
inputs: Dict[str, Any],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
_run_manager = run_manager or CallbackManagerForChainRun.get_noop_manager()
input_text = f"{inputs[self.input_key]}\nESQuery:"
_run_manager.on_text(input_text, verbose=self.verbose)
indices = self._list_indices()
indices_info = self._get_indices_infos(indices)
query_inputs: dict = {
"input": input_text,
"top_k": str(self.top_k),
"indices_info": indices_info,
"stop": ["\nESResult:"],
}
intermediate_steps: List = []
try:
intermediate_steps.append(query_inputs) # input: es generation
es_cmd = self.query_chain.invoke(
query_inputs,
config={"callbacks": _run_manager.get_child()},
)
_run_manager.on_text(es_cmd, color="green", verbose=self.verbose)
intermediate_steps.append(
es_cmd
) # output: elasticsearch dsl generation (no checker)
intermediate_steps.append({"es_cmd": es_cmd}) # input: ES search
result = self._search(indices=indices, query=es_cmd)
intermediate_steps.append(str(result)) # output: ES search
_run_manager.on_text("\nESResult: ", verbose=self.verbose)
_run_manager.on_text(result, color="yellow", verbose=self.verbose)
_run_manager.on_text("\nAnswer:", verbose=self.verbose)
answer_inputs: dict = {"data": result, "input": input_text}
intermediate_steps.append(answer_inputs) # input: final answer
final_result = self.answer_chain.invoke(
answer_inputs,
config={"callbacks": _run_manager.get_child()},
)
intermediate_steps.append(final_result) # output: final answer
_run_manager.on_text(final_result, color="green", verbose=self.verbose)
chain_result: Dict[str, Any] = {self.output_key: final_result}
if self.return_intermediate_steps:
chain_result[INTERMEDIATE_STEPS_KEY] = intermediate_steps
return chain_result
except Exception as exc:
# Append intermediate steps to exception, to aid in logging and later
# improvement of few shot prompt seeds
exc.intermediate_steps = intermediate_steps # type: ignore
raise exc
@property
def _chain_type(self) -> str:
return "elasticsearch_database_chain"
[docs]
@classmethod
def from_llm(
cls,
llm: BaseLanguageModel,
database: Elasticsearch,
*,
query_prompt: Optional[BasePromptTemplate] = None,
answer_prompt: Optional[BasePromptTemplate] = None,
query_output_parser: Optional[BaseOutputParser] = None,
**kwargs: Any,
) -> ElasticsearchDatabaseChain:
"""Convenience method to construct ElasticsearchDatabaseChain from an LLM.
Args:
llm: The language model to use.
database: The Elasticsearch db.
query_prompt: The prompt to use for query construction.
answer_prompt: The prompt to use for answering user question given data.
query_output_parser: The output parser to use for parsing model-generated
ES query. Defaults to SimpleJsonOutputParser.
kwargs: Additional arguments to pass to the constructor.
"""
query_prompt = query_prompt or DSL_PROMPT
query_output_parser = query_output_parser or SimpleJsonOutputParser()
query_chain = query_prompt | llm | query_output_parser
answer_prompt = answer_prompt or ANSWER_PROMPT
answer_chain = answer_prompt | llm | StrOutputParser()
return cls(
query_chain=query_chain,
answer_chain=answer_chain,
database=database,
**kwargs,
)