Source code for langchain_elasticsearch.cache

from typing import Any, Iterator, List, Optional, Sequence, Tuple

from elasticsearch import Elasticsearch, exceptions, helpers  # noqa: F401
from elasticsearch.helpers import BulkIndexError  # noqa: F401
from langchain_core.caches import RETURN_VAL_TYPE, BaseCache  # noqa: F401
from langchain_core.load import dumps, loads  # noqa: F401
from langchain_core.stores import ByteStore  # noqa: F401

from langchain_elasticsearch._async.cache import (
    AsyncElasticsearchCache as _AsyncElasticsearchCache,
)
from langchain_elasticsearch._async.cache import (
    AsyncElasticsearchEmbeddingsCache as _AsyncElasticsearchEmbeddingsCache,
)
from langchain_elasticsearch._sync.cache import (  # noqa: F401
    ElasticsearchCache,
    ElasticsearchEmbeddingsCache,
)
from langchain_elasticsearch.client import (  # noqa: F401
    create_async_elasticsearch_client,
    create_elasticsearch_client,
)


# langchain defines some sync methods as abstract in its base class
# so we have to add dummy methods for them, even though we only use the async versions
[docs] class AsyncElasticsearchCache(_AsyncElasticsearchCache):
[docs] def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: raise NotImplementedError("This class is asynchronous, use alookup()")
[docs] def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None: raise NotImplementedError("This class is asynchronous, use aupdate()")
[docs] def clear(self, **kwargs: Any) -> None: raise NotImplementedError("This class is asynchronous, use aclear()")
# langchain defines some sync methods as abstract in its base class # so we have to add dummy methods for them, even though we only use the async versions
[docs] class AsyncElasticsearchEmbeddingsCache(_AsyncElasticsearchEmbeddingsCache):
[docs] def mget(self, keys: Sequence[str]) -> List[Optional[bytes]]: raise NotImplementedError("This class is asynchronous, use amget()")
[docs] def mset(self, key_value_pairs: Sequence[Tuple[str, bytes]]) -> None: raise NotImplementedError("This class is asynchronous, use amset()")
[docs] def mdelete(self, keys: Sequence[str]) -> None: raise NotImplementedError("This class is asynchronous, use amdelete()")
[docs] def yield_keys(self, *, prefix: Optional[str] = None) -> Iterator[str]: raise NotImplementedError("This class is asynchronous, use ayield_keys()")