"""Module contains code for a cache backed embedder.The cache backed embedder is a wrapper around an embedder that cachesembeddings in a key-value store. The cache is used to avoid recomputingembeddings for the same text.The text is hashed and the hash is used as the key in the cache."""from__future__importannotationsimporthashlibimportjsonimportuuidimportwarningsfromcollections.abcimportSequencefromtypingimportCallable,Literal,Optional,Union,castfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.storesimportBaseStore,ByteStorefromlangchain_core.utils.iterimportbatch_iteratefromlangchain.storage.encoder_backedimportEncoderBackedStoreNAMESPACE_UUID=uuid.UUID(int=1985)def_sha1_hash_to_uuid(text:str)->uuid.UUID:"""Return a UUID derived from *text* using SHA-1 (deterministic). Deterministic and fast, **but not collision-resistant**. A malicious attacker could try to create two different texts that hash to the same UUID. This may not necessarily be an issue in the context of caching embeddings, but new applications should swap this out for a stronger hash function like xxHash, BLAKE2 or SHA-256, which are collision-resistant. """sha1_hex=hashlib.sha1(text.encode("utf-8"),usedforsecurity=False).hexdigest()# Embed the hex string in `uuid5` to obtain a valid UUID.returnuuid.uuid5(NAMESPACE_UUID,sha1_hex)def_make_default_key_encoder(namespace:str,algorithm:str)->Callable[[str],str]:"""Create a default key encoder function. Args: namespace: Prefix that segregates keys from different embedding models. algorithm: * ``'sha1'`` - fast but not collision-resistant * ``'blake2b'`` - cryptographically strong, faster than SHA-1 * ``'sha256'`` - cryptographically strong, slower than SHA-1 * ``'sha512'`` - cryptographically strong, slower than SHA-1 Returns: A function that encodes a key using the specified algorithm. """ifalgorithm=="sha1":_warn_about_sha1_encoder()def_key_encoder(key:str)->str:"""Encode a key using the specified algorithm."""ifalgorithm=="sha1":returnf"{namespace}{_sha1_hash_to_uuid(key)}"ifalgorithm=="blake2b":returnf"{namespace}{hashlib.blake2b(key.encode('utf-8')).hexdigest()}"ifalgorithm=="sha256":returnf"{namespace}{hashlib.sha256(key.encode('utf-8')).hexdigest()}"ifalgorithm=="sha512":returnf"{namespace}{hashlib.sha512(key.encode('utf-8')).hexdigest()}"raiseValueError(f"Unsupported algorithm: {algorithm}")return_key_encoderdef_value_serializer(value:Sequence[float])->bytes:"""Serialize a value."""returnjson.dumps(value).encode()def_value_deserializer(serialized_value:bytes)->list[float]:"""Deserialize a value."""returncast(list[float],json.loads(serialized_value.decode()))# The warning is global; track emission, so it appears only once._warned_about_sha1:bool=Falsedef_warn_about_sha1_encoder()->None:"""Emit a one-time warning about SHA-1 collision weaknesses."""global_warned_about_sha1ifnot_warned_about_sha1:warnings.warn("Using default key encoder: SHA-1 is *not* collision-resistant. ""While acceptable for most cache scenarios, a motivated attacker ""can craft two different payloads that map to the same cache key. ""If that risk matters in your environment, supply a stronger ""encoder (e.g. SHA-256 or BLAKE2) via the `key_encoder` argument. ""If you change the key encoder, consider also creating a new cache, ""to avoid (the potential for) collisions with existing keys.",category=UserWarning,stacklevel=2,)_warned_about_sha1=True
[docs]classCacheBackedEmbeddings(Embeddings):"""Interface for caching results from embedding models. The interface allows works with any store that implements the abstract store interface accepting keys of type str and values of list of floats. If need be, the interface can be extended to accept other implementations of the value serializer and deserializer, as well as the key encoder. Note that by default only document embeddings are cached. To cache query embeddings too, pass in a query_embedding_store to constructor. Examples: .. code-block: python from langchain.embeddings import CacheBackedEmbeddings from langchain.storage import LocalFileStore from langchain_community.embeddings import OpenAIEmbeddings store = LocalFileStore('./my_cache') underlying_embedder = OpenAIEmbeddings() embedder = CacheBackedEmbeddings.from_bytes_store( underlying_embedder, store, namespace=underlying_embedder.model ) # Embedding is computed and cached embeddings = embedder.embed_documents(["hello", "goodbye"]) # Embeddings are retrieved from the cache, no computation is done embeddings = embedder.embed_documents(["hello", "goodbye"]) """
[docs]def__init__(self,underlying_embeddings:Embeddings,document_embedding_store:BaseStore[str,list[float]],*,batch_size:Optional[int]=None,query_embedding_store:Optional[BaseStore[str,list[float]]]=None,)->None:"""Initialize the embedder. Args: underlying_embeddings: the embedder to use for computing embeddings. document_embedding_store: The store to use for caching document embeddings. batch_size: The number of documents to embed between store updates. query_embedding_store: The store to use for caching query embeddings. If ``None``, query embeddings are not cached. """super().__init__()self.document_embedding_store=document_embedding_storeself.query_embedding_store=query_embedding_storeself.underlying_embeddings=underlying_embeddingsself.batch_size=batch_size
[docs]defembed_documents(self,texts:list[str])->list[list[float]]:"""Embed a list of texts. The method first checks the cache for the embeddings. If the embeddings are not found, the method uses the underlying embedder to embed the documents and stores the results in the cache. Args: texts: A list of texts to embed. Returns: A list of embeddings for the given texts. """vectors:list[Union[list[float],None]]=self.document_embedding_store.mget(texts)all_missing_indices:list[int]=[ifori,vectorinenumerate(vectors)ifvectorisNone]formissing_indicesinbatch_iterate(self.batch_size,all_missing_indices):missing_texts=[texts[i]foriinmissing_indices]missing_vectors=self.underlying_embeddings.embed_documents(missing_texts)self.document_embedding_store.mset(list(zip(missing_texts,missing_vectors)))forindex,updated_vectorinzip(missing_indices,missing_vectors):vectors[index]=updated_vectorreturncast(list[list[float]],vectors)# Nones should have been resolved by now
[docs]asyncdefaembed_documents(self,texts:list[str])->list[list[float]]:"""Embed a list of texts. The method first checks the cache for the embeddings. If the embeddings are not found, the method uses the underlying embedder to embed the documents and stores the results in the cache. Args: texts: A list of texts to embed. Returns: A list of embeddings for the given texts. """vectors:list[Union[list[float],None]]=awaitself.document_embedding_store.amget(texts)all_missing_indices:list[int]=[ifori,vectorinenumerate(vectors)ifvectorisNone]# batch_iterate supports None batch_size which returns all elements at once# as a single batch.formissing_indicesinbatch_iterate(self.batch_size,all_missing_indices):missing_texts=[texts[i]foriinmissing_indices]missing_vectors=awaitself.underlying_embeddings.aembed_documents(missing_texts)awaitself.document_embedding_store.amset(list(zip(missing_texts,missing_vectors)))forindex,updated_vectorinzip(missing_indices,missing_vectors):vectors[index]=updated_vectorreturncast(list[list[float]],vectors)# Nones should have been resolved by now
[docs]defembed_query(self,text:str)->list[float]:"""Embed query text. By default, this method does not cache queries. To enable caching, set the ``cache_query`` parameter to ``True`` when initializing the embedder. Args: text: The text to embed. Returns: The embedding for the given text. """ifnotself.query_embedding_store:returnself.underlying_embeddings.embed_query(text)(cached,)=self.query_embedding_store.mget([text])ifcachedisnotNone:returncachedvector=self.underlying_embeddings.embed_query(text)self.query_embedding_store.mset([(text,vector)])returnvector
[docs]asyncdefaembed_query(self,text:str)->list[float]:"""Embed query text. By default, this method does not cache queries. To enable caching, set the ``cache_query`` parameter to ``True`` when initializing the embedder. Args: text: The text to embed. Returns: The embedding for the given text. """ifnotself.query_embedding_store:returnawaitself.underlying_embeddings.aembed_query(text)(cached,)=awaitself.query_embedding_store.amget([text])ifcachedisnotNone:returncachedvector=awaitself.underlying_embeddings.aembed_query(text)awaitself.query_embedding_store.amset([(text,vector)])returnvector
[docs]@classmethoddeffrom_bytes_store(cls,underlying_embeddings:Embeddings,document_embedding_cache:ByteStore,*,namespace:str="",batch_size:Optional[int]=None,query_embedding_cache:Union[bool,ByteStore]=False,key_encoder:Union[Callable[[str],str],Literal["sha1","blake2b","sha256","sha512"]]="sha1",)->CacheBackedEmbeddings:"""On-ramp that adds the necessary serialization and encoding to the store. Args: underlying_embeddings: The embedder to use for embedding. document_embedding_cache: The cache to use for storing document embeddings. *, namespace: The namespace to use for document cache. This namespace is used to avoid collisions with other caches. For example, set it to the name of the embedding model used. batch_size: The number of documents to embed between store updates. query_embedding_cache: The cache to use for storing query embeddings. True to use the same cache as document embeddings. False to not cache query embeddings. key_encoder: Optional callable to encode keys. If not provided, a default encoder using SHA-1 will be used. SHA-1 is not collision-resistant, and a motivated attacker could craft two different texts that hash to the same cache key. New applications should use one of the alternative encoders or provide a custom and strong key encoder function to avoid this risk. If you change a key encoder in an existing cache, consider just creating a new cache, to avoid (the potential for) collisions with existing keys or having duplicate keys for the same text in the cache. Returns: An instance of CacheBackedEmbeddings that uses the provided cache. """ifisinstance(key_encoder,str):key_encoder=_make_default_key_encoder(namespace,key_encoder)elifcallable(key_encoder):# If a custom key encoder is provided, it should not be used with a# namespace.# A user can handle namespacing in directly their custom key encoder.ifnamespace:raiseValueError("Do not supply `namespace` when using a custom key_encoder; ""add any prefixing inside the encoder itself.")else:raiseValueError("key_encoder must be either 'blake2b', 'sha1', 'sha256', 'sha512' ""or a callable that encodes keys.")document_embedding_store=EncoderBackedStore[str,list[float]](document_embedding_cache,key_encoder,_value_serializer,_value_deserializer,)ifquery_embedding_cacheisTrue:query_embedding_store=document_embedding_storeelifquery_embedding_cacheisFalse:query_embedding_store=Noneelse:query_embedding_store=EncoderBackedStore[str,list[float]](query_embedding_cache,key_encoder,_value_serializer,_value_deserializer,)returncls(underlying_embeddings,document_embedding_store,batch_size=batch_size,query_embedding_store=query_embedding_store,)