Source code for langchain_elasticsearch._async.cache

import base64
import hashlib
import logging
from datetime import datetime
from functools import cached_property
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncIterator,
    Dict,
    Iterable,
    List,
    Optional,
    Sequence,
    Tuple,
)

from elasticsearch import AsyncElasticsearch, exceptions, helpers
from elasticsearch.helpers import BulkIndexError
from langchain_core.caches import RETURN_VAL_TYPE, BaseCache
from langchain_core.load import dumps, loads
from langchain_core.stores import ByteStore

from langchain_elasticsearch.client import create_async_elasticsearch_client

if TYPE_CHECKING:
    from elasticsearch import AsyncElasticsearch

logger = logging.getLogger(__name__)


async def _manage_cache_index(
    es_client: AsyncElasticsearch, index_name: str, mapping: Dict[str, Any]
) -> bool:
    """Write or update an index or alias according to the default mapping"""
    if await es_client.indices.exists_alias(name=index_name):
        await es_client.indices.put_mapping(index=index_name, body=mapping["mappings"])
        return True

    elif not await es_client.indices.exists(index=index_name):
        logger.debug(f"Creating new Elasticsearch index: {index_name}")
        await es_client.indices.create(index=index_name, body=mapping)
        return False

    return False


class AsyncElasticsearchCache(BaseCache):
    """An Elasticsearch cache integration for LLMs.

    For synchronous applications, use the ``ElasticsearchCache`` class.
    For asyhchronous applications, use the ``AsyncElasticsearchCache`` class.
    """

[docs] def __init__( self, index_name: str, store_input: bool = True, store_input_params: bool = True, metadata: Optional[Dict[str, Any]] = None, *, es_url: Optional[str] = None, es_cloud_id: Optional[str] = None, es_user: Optional[str] = None, es_api_key: Optional[str] = None, es_password: Optional[str] = None, es_params: Optional[Dict[str, Any]] = None, ): """ Initialize the Elasticsearch cache store by specifying the index/alias to use and determining which additional information (like input, input parameters, and any other metadata) should be stored in the cache. Args: index_name (str): The name of the index or the alias to use for the cache. If they do not exist an index is created, according to the default mapping defined by the `mapping` property. store_input (bool): Whether to store the LLM input in the cache, i.e., the input prompt. Default to True. store_input_params (bool): Whether to store the input parameters in the cache, i.e., the LLM parameters used to generate the LLM response. Default to True. metadata (Optional[dict]): Additional metadata to store in the cache, for filtering purposes. This must be JSON serializable in an Elasticsearch document. Default to None. es_url: URL of the Elasticsearch instance to connect to. es_cloud_id: Cloud ID of the Elasticsearch instance to connect to. es_user: Username to use when connecting to Elasticsearch. es_password: Password to use when connecting to Elasticsearch. es_api_key: API key to use when connecting to Elasticsearch. es_params: Other parameters for the Elasticsearch client. """ self._index_name = index_name self._store_input = store_input self._store_input_params = store_input_params self._metadata = metadata self._es_client = create_async_elasticsearch_client( url=es_url, cloud_id=es_cloud_id, api_key=es_api_key, username=es_user, password=es_password, params=es_params, ) self._is_alias: Optional[bool] = None
[docs] async def is_alias(self) -> bool: if self._is_alias is None: self._is_alias = await _manage_cache_index( self._es_client, self._index_name, self.mapping, ) return self._is_alias # type: ignore[return-value]
@cached_property def mapping(self) -> Dict[str, Any]: """Get the default mapping for the index.""" return { "mappings": { "properties": { "llm_output": {"type": "text", "index": False}, "llm_params": {"type": "text", "index": False}, "llm_input": {"type": "text", "index": False}, "metadata": {"type": "object"}, "timestamp": {"type": "date"}, } } } @staticmethod def _key(prompt: str, llm_string: str) -> str: """Generate a key for the cache store.""" return hashlib.md5((prompt + llm_string).encode()).hexdigest()
[docs] async def alookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: """Look up based on prompt and llm_string.""" cache_key = self._key(prompt, llm_string) if await self.is_alias(): # get the latest record according to its writing date, in order to # address cases where multiple indices have a doc with the same id result = await self._es_client.search( index=self._index_name, body={ "query": {"term": {"_id": cache_key}}, "sort": {"timestamp": {"order": "asc"}}, }, source_includes=["llm_output"], ) if result["hits"]["total"]["value"] > 0: record = result["hits"]["hits"][0] else: return None else: try: record = await self._es_client.get( index=self._index_name, id=cache_key, source=["llm_output"] ) except exceptions.NotFoundError: return None return [loads(item) for item in record["_source"]["llm_output"]]
[docs] def build_document( self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE ) -> Dict[str, Any]: """Build the Elasticsearch document for storing a single LLM interaction""" body: Dict[str, Any] = { "llm_output": [dumps(item) for item in return_val], "timestamp": datetime.now().isoformat(), } if self._store_input_params: body["llm_params"] = llm_string if self._metadata is not None: body["metadata"] = self._metadata if self._store_input: body["llm_input"] = prompt return body
[docs] async def aupdate( self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE ) -> None: """Update based on prompt and llm_string.""" body = self.build_document(prompt, llm_string, return_val) await self._es_client.index( index=self._index_name, id=self._key(prompt, llm_string), body=body, require_alias=await self.is_alias(), refresh=True, )
[docs] async def aclear(self, **kwargs: Any) -> None: """Clear cache.""" await self._es_client.delete_by_query( index=self._index_name, body={"query": {"match_all": {}}}, refresh=True, wait_for_completion=True, )
class AsyncElasticsearchEmbeddingsCache(ByteStore): """An Elasticsearch store for caching embeddings. For synchronous applications, use the `ElasticsearchEmbeddingsCache` class. For asyhchronous applications, use the `AsyncElasticsearchEmbeddingsCache` class. """
[docs] def __init__( self, index_name: str, store_input: bool = True, metadata: Optional[Dict[str, Any]] = None, namespace: Optional[str] = None, maximum_duplicates_allowed: int = 1, *, es_url: Optional[str] = None, es_cloud_id: Optional[str] = None, es_user: Optional[str] = None, es_api_key: Optional[str] = None, es_password: Optional[str] = None, es_params: Optional[Dict[str, Any]] = None, ): """ Initialize the Elasticsearch cache store by specifying the index/alias to use and determining which additional information (like input, input parameters, and any other metadata) should be stored in the cache. Provide a namespace to organize the cache. Args: index_name (str): The name of the index or the alias to use for the cache. If they do not exist an index is created, according to the default mapping defined by the `mapping` property. store_input (bool): Whether to store the input in the cache. Default to True. metadata (Optional[dict]): Additional metadata to store in the cache, for filtering purposes. This must be JSON serializable in an Elasticsearch document. Default to None. namespace (Optional[str]): A namespace to use for the cache. maximum_duplicates_allowed (int): Defines the maximum number of duplicate keys permitted. Must be used in scenarios where the same key appears across multiple indices that share the same alias. Default to 1. es_url: URL of the Elasticsearch instance to connect to. es_cloud_id: Cloud ID of the Elasticsearch instance to connect to. es_user: Username to use when connecting to Elasticsearch. es_password: Password to use when connecting to Elasticsearch. es_api_key: API key to use when connecting to Elasticsearch. es_params: Other parameters for the Elasticsearch client. """ self._namespace = namespace self._maximum_duplicates_allowed = maximum_duplicates_allowed self._index_name = index_name self._store_input = store_input self._metadata = metadata self._es_client = create_async_elasticsearch_client( url=es_url, cloud_id=es_cloud_id, api_key=es_api_key, username=es_user, password=es_password, params=es_params, ) self._is_alias: Optional[bool] = None
[docs] async def is_alias(self) -> bool: if self._is_alias is None: self._is_alias = await _manage_cache_index( self._es_client, self._index_name, self.mapping, ) return self._is_alias # type: ignore[return-value]
[docs] @staticmethod def encode_vector(data: bytes) -> str: """Encode the vector data as bytes to as a base64 string.""" return base64.b64encode(data).decode("utf-8")
[docs] @staticmethod def decode_vector(data: str) -> bytes: """Decode the base64 string to vector data as bytes.""" return base64.b64decode(data)
@cached_property def mapping(self) -> Dict[str, Any]: """Get the default mapping for the index.""" return { "mappings": { "properties": { "text_input": {"type": "text", "index": False}, "vector_dump": { "type": "binary", "doc_values": False, }, "metadata": {"type": "object"}, "timestamp": {"type": "date"}, "namespace": {"type": "keyword"}, } } } def _key(self, input_text: str) -> str: """Generate a key for the store.""" return hashlib.md5(((self._namespace or "") + input_text).encode()).hexdigest() @classmethod def _deduplicate_hits(cls, hits: List[dict]) -> Dict[str, bytes]: """ Collapse the results from a search query with multiple indices returning only the latest version of the documents """ map_ids = {} for hit in sorted( hits, key=lambda x: datetime.fromisoformat(x["_source"]["timestamp"]), reverse=True, ): vector_id: str = hit["_id"] if vector_id not in map_ids: map_ids[vector_id] = cls.decode_vector(hit["_source"]["vector_dump"]) return map_ids
[docs] async def amget(self, keys: Sequence[str]) -> List[Optional[bytes]]: """Get the values associated with the given keys.""" if not any(keys): return [] cache_keys = [self._key(k) for k in keys] if await self.is_alias(): try: results = await self._es_client.search( index=self._index_name, body={ "query": {"ids": {"values": cache_keys}}, "size": len(cache_keys) * self._maximum_duplicates_allowed, }, source_includes=["vector_dump", "timestamp"], ) except exceptions.BadRequestError as e: if "window too large" in ( e.body.get("error", {}).get("root_cause", [{}])[0].get("reason", "") ): logger.warning( "Exceeded the maximum window size, " "Reduce the duplicates manually or lower " "`maximum_duplicate_allowed.`" ) raise e total_hits = results["hits"]["total"]["value"] if self._maximum_duplicates_allowed > 1 and total_hits > len(cache_keys): logger.warning( f"Deduplicating, found {total_hits} hits for {len(cache_keys)} keys" ) map_ids = self._deduplicate_hits(results["hits"]["hits"]) else: map_ids = { r["_id"]: self.decode_vector(r["_source"]["vector_dump"]) for r in results["hits"]["hits"] } return [map_ids.get(k) for k in cache_keys] else: records = await self._es_client.mget( index=self._index_name, ids=cache_keys, source_includes=["vector_dump"] ) return [ self.decode_vector(r["_source"]["vector_dump"]) if r["found"] else None for r in records["docs"] ]
[docs] def build_document(self, text_input: str, vector: bytes) -> Dict[str, Any]: """Build the Elasticsearch document for storing a single embedding""" body: Dict[str, Any] = { "vector_dump": self.encode_vector(vector), "timestamp": datetime.now().isoformat(), } if self._metadata is not None: body["metadata"] = self._metadata if self._store_input: body["text_input"] = text_input if self._namespace: body["namespace"] = self._namespace return body
async def _bulk(self, actions: Iterable[Dict[str, Any]]) -> None: try: await helpers.async_bulk( client=self._es_client, actions=actions, index=self._index_name, require_alias=await self.is_alias(), refresh=True, ) except BulkIndexError as e: first_error = e.errors[0].get("index", {}).get("error", {}) logger.error(f"First bulk error reason: {first_error.get('reason')}") raise e
[docs] async def amset(self, key_value_pairs: Sequence[Tuple[str, bytes]]) -> None: """Set the values for the given keys.""" actions = ( { "_op_type": "index", "_id": self._key(key), "_source": self.build_document(key, vector), } for key, vector in key_value_pairs ) await self._bulk(actions)
[docs] async def amdelete(self, keys: Sequence[str]) -> None: """Delete the given keys and their associated values.""" actions = ({"_op_type": "delete", "_id": self._key(key)} for key in keys) await self._bulk(actions)
[docs] async def ayield_keys(self, *, prefix: Optional[str] = None) -> AsyncIterator[str]: # type: ignore[override] """Get an iterator over keys that match the given prefix.""" # TODO This method is not currently used by CacheBackedEmbeddings, # we can leave it blank. It could be implemented with ES "index_prefixes", # but they are limited and expensive. raise NotImplementedError()