"""LangChain Couchbase CachesFunctions "_hash", "_loads_generations" and "_dumps_generations"are duplicated in this utility from modules: - "libs/community/langchain_community/cache.py""""importhashlibimportjsonimportloggingfromtypingimportAny,Dict,Optional,Unionfromcouchbase.clusterimportClusterfromlangchain_core.cachesimportRETURN_VAL_TYPE,BaseCachefromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.load.dumpimportdumpsfromlangchain_core.load.loadimportloadsfromlangchain_core.outputsimportGenerationfromlangchain_couchbase.vectorstoresimportCouchbaseVectorStorelogger=logging.getLogger(__file__)def_hash(_input:str)->str:"""Use a deterministic hashing approach."""returnhashlib.md5(_input.encode()).hexdigest()def_dumps_generations(generations:RETURN_VAL_TYPE)->str:""" Serialization for generic RETURN_VAL_TYPE, i.e. sequence of `Generation` Args: generations (RETURN_VAL_TYPE): A list of language model generations. Returns: str: a single string representing a list of generations. This function (+ its counterpart `_loads_generations`) rely on the dumps/loads pair with Reviver, so are able to deal with all subclasses of Generation. Each item in the list can be `dumps`ed to a string, then we make the whole list of strings into a json-dumped. """returnjson.dumps([dumps(_item)for_itemingenerations])def_loads_generations(generations_str:str)->Union[RETURN_VAL_TYPE,None]:""" Deserialization of a string into a generic RETURN_VAL_TYPE (i.e. a sequence of `Generation`). See `_dumps_generations`, the inverse of this function. Args: generations_str (str): A string representing a list of generations. Compatible with the legacy cache-blob format Does not raise exceptions for malformed entries, just logs a warning and returns none: the caller should be prepared for such a cache miss. Returns: RETURN_VAL_TYPE: A list of generations. """try:generations=[loads(_item_str)for_item_strinjson.loads(generations_str)]returngenerationsexcept(json.JSONDecodeError,TypeError):# deferring the (soft) handling to after the legacy-format attemptpasstry:gen_dicts=json.loads(generations_str)# not relying on `_load_generations_from_json` (which could disappear):generations=[Generation(**generation_dict)forgeneration_dictingen_dicts]logger.warning(f"Legacy 'Generation' cached blob encountered: '{generations_str}'")returngenerationsexcept(json.JSONDecodeError,TypeError):logger.warning(f"Malformed/unparsable cached blob encountered: '{generations_str}'")returnNone
[docs]classCouchbaseCache(BaseCache):"""Couchbase LLM Cache LLM Cache that uses Couchbase as the backend """PROMPT="prompt"LLM="llm"RETURN_VAL="return_val"def_check_bucket_exists(self)->bool:"""Check if the bucket exists in the linked Couchbase cluster"""bucket_manager=self._cluster.buckets()try:bucket_manager.get_bucket(self._bucket_name)returnTrueexceptException:returnFalsedef_check_scope_and_collection_exists(self)->bool:"""Check if the scope and collection exists in the linked Couchbase bucket Raises a ValueError if either is not found"""scope_collection_map:Dict[str,Any]={}# Get a list of all scopes in the bucketforscopeinself._bucket.collections().get_all_scopes():scope_collection_map[scope.name]=[]# Get a list of all the collections in the scopeforcollectioninscope.collections:scope_collection_map[scope.name].append(collection.name)# Check if the scope existsifself._scope_namenotinscope_collection_map.keys():raiseValueError(f"Scope {self._scope_name} not found in Couchbase "f"bucket {self._bucket_name}")# Check if the collection exists in the scopeifself._collection_namenotinscope_collection_map[self._scope_name]:raiseValueError(f"Collection {self._collection_name} not found in scope "f"{self._scope_name} in Couchbase bucket {self._bucket_name}")returnTrue
[docs]def__init__(self,cluster:Cluster,bucket_name:str,scope_name:str,collection_name:str,**kwargs:Dict[str,Any],)->None:"""Initialize the Couchbase LLM Cache Args: cluster (Cluster): couchbase cluster object with active connection. bucket_name (str): name of the bucket to store documents in. scope_name (str): name of the scope in bucket to store documents in. collection_name (str): name of the collection in the scope to store documents in. """ifnotisinstance(cluster,Cluster):raiseValueError(f"cluster should be an instance of couchbase.Cluster, "f"got {type(cluster)}")self._cluster=clusterself._bucket_name=bucket_nameself._scope_name=scope_nameself._collection_name=collection_name# Check if the bucket existsifnotself._check_bucket_exists():raiseValueError(f"Bucket {self._bucket_name} does not exist. "" Please create the bucket before searching.")try:self._bucket=self._cluster.bucket(self._bucket_name)self._scope=self._bucket.scope(self._scope_name)self._collection=self._scope.collection(self._collection_name)exceptExceptionase:raiseValueError("Error connecting to couchbase. ""Please check the connection and credentials.")frome# Check if the scope and collection exists. Throws ValueError if they don'ttry:self._check_scope_and_collection_exists()exceptExceptionase:raisee
[docs]deflookup(self,prompt:str,llm_string:str)->Optional[RETURN_VAL_TYPE]:"""Look up from cache based on prompt and llm_string."""try:doc=self._collection.get(self._generate_key(prompt,llm_string)).content_as[dict]return_loads_generations(doc[self.RETURN_VAL])exceptException:returnNone
def_generate_key(self,prompt:str,llm_string:str)->str:"""Generate the key based on prompt and llm_string."""return_hash(prompt+llm_string)
[docs]defupdate(self,prompt:str,llm_string:str,return_val:RETURN_VAL_TYPE)->None:"""Update cache based on prompt and llm_string."""doc={self.PROMPT:prompt,self.LLM:llm_string,self.RETURN_VAL:_dumps_generations(return_val),}try:self._collection.upsert(key=self._generate_key(prompt,llm_string),value=doc)exceptException:logger.error("Error updating cache")
[docs]defclear(self,**kwargs:Any)->None:"""Clear the cache. This will delete all documents in the collection. This requires an index on the collection. """try:query=f"DELETE FROM `{self._collection_name}`"self._scope.query(query).execute()exceptException:logger.error("Error clearing cache. Please check if you have an index.")
[docs]classCouchbaseSemanticCache(BaseCache,CouchbaseVectorStore):"""Couchbase Semantic Cache Cache backed by a Couchbase Server with Vector Store support """LLM="llm_string"RETURN_VAL="return_val"
[docs]def__init__(self,cluster:Cluster,embedding:Embeddings,bucket_name:str,scope_name:str,collection_name:str,index_name:str,score_threshold:Optional[float]=None,)->None:"""Initialize the Couchbase LLM Cache Args: cluster (Cluster): couchbase cluster object with active connection. embedding (Embeddings): embedding model to use. bucket_name (str): name of the bucket to store documents in. scope_name (str): name of the scope in bucket to store documents in. collection_name (str): name of the collection in the scope to store documents in. index_name (str): name of the Search index to use. score_threshold (float): score threshold to use for filtering results. """ifnotisinstance(cluster,Cluster):raiseValueError(f"cluster should be an instance of couchbase.Cluster, "f"got {type(cluster)}")self._cluster=clusterself._bucket_name=bucket_nameself._scope_name=scope_nameself._collection_name=collection_name# Check if the bucket existsifnotself._check_bucket_exists():raiseValueError(f"Bucket {self._bucket_name} does not exist. "" Please create the bucket before searching.")try:self._bucket=self._cluster.bucket(self._bucket_name)self._scope=self._bucket.scope(self._scope_name)self._collection=self._scope.collection(self._collection_name)exceptExceptionase:raiseValueError("Error connecting to couchbase. ""Please check the connection and credentials.")frome# Check if the scope and collection exists. Throws ValueError if they don'ttry:self._check_scope_and_collection_exists()exceptExceptionase:raiseeself.score_threshold=score_threshold# Initialize the vector storesuper().__init__(cluster=cluster,bucket_name=bucket_name,scope_name=scope_name,collection_name=collection_name,embedding=embedding,index_name=index_name,)
[docs]deflookup(self,prompt:str,llm_string:str)->Optional[RETURN_VAL_TYPE]:"""Look up from cache based on the semantic similarity of the prompt"""search_results=self.similarity_search_with_score(prompt,k=1,search_options={f"metadata.{self.LLM}":llm_string})ifsearch_results:selected_doc,score=search_results[0]else:returnNone# Check if the score is above the threshold if a threshold is providedifself.score_threshold:ifscore<self.score_threshold:returnNone# Note that the llm_string might not match the vector search result.# So if the llm_string does not match, do not return the result.ifselected_doc.metadata["llm_string"]!=llm_string:returnNonereturn_loads_generations(selected_doc.metadata[self.RETURN_VAL])
[docs]defupdate(self,prompt:str,llm_string:str,return_val:RETURN_VAL_TYPE)->None:"""Update cache based on the prompt and llm_string"""try:self.add_texts(texts=[prompt],metadatas=[{self.LLM:llm_string,self.RETURN_VAL:_dumps_generations(return_val),}],)exceptException:logger.error("Error updating cache")
[docs]defclear(self,**kwargs:Any)->None:"""Clear the cache. This will delete all documents in the collection. This requires an index on the collection. """try:query=f"DELETE FROM `{self._collection_name}`"self._scope.query(query).execute()exceptException:logger.error("Error clearing cache. Please check if you have an index.")