Source code for langchain_community.vectorstores.vdms
from__future__importannotationsimportbase64importloggingimportosimportuuidfromcopyimportdeepcopyfromtypingimport(TYPE_CHECKING,Any,Callable,Dict,Iterable,List,Literal,Optional,Sized,Tuple,Type,Union,get_args,)importnumpyasnpfromlangchain_core._api.deprecationimportdeprecatedfromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.vectorstoresimportVectorStorefromlangchain_community.vectorstores.utilsimportmaximal_marginal_relevanceifTYPE_CHECKING:importvdmsDISTANCE_METRICS=Literal["L2",# Euclidean Distance"IP",# Inner Product]AVAILABLE_DISTANCE_METRICS:List[DISTANCE_METRICS]=list(get_args(DISTANCE_METRICS))ENGINES=Literal["TileDBDense",# TileDB Dense"TileDBSparse",# TileDB Sparse"FaissFlat",# FAISS IndexFlat"FaissIVFFlat",# FAISS IndexIVFFlat"Flinng",# FLINNG]AVAILABLE_ENGINES:List[ENGINES]=list(get_args(ENGINES))DEFAULT_COLLECTION_NAME="langchain"DEFAULT_INSERT_BATCH_SIZE=32# Number of Documents to return.DEFAULT_K=3# Number of Documents to fetch to pass to knn when filters applied.DEFAULT_FETCH_K=DEFAULT_K*5DEFAULT_PROPERTIES=["_distance","id","content"]INVALID_DOC_METADATA_KEYS=["_distance","content","blob"]INVALID_METADATA_VALUE=["Missing property",None,{}]# type: Listlogger=logging.getLogger(__name__)def_len_check_if_sized(x:Any,y:Any,x_name:str,y_name:str)->None:""" Check that sizes of two variables are the same Args: x: Variable to compare y: Variable to compare x_name: Name for variable x y_name: Name for variable y """ifisinstance(x,Sized)andisinstance(y,Sized)andlen(x)!=len(y):raiseValueError(f"{x_name} and {y_name} expected to be equal length but "f"len({x_name})={len(x)} and len({y_name})={len(y)}")returndef_results_to_docs(results:Any)->List[Document]:return[docfordoc,_in_results_to_docs_and_scores(results)]def_results_to_docs_and_scores(results:Any)->List[Tuple[Document,float]]:final_res:List[Any]=[]try:responses,blobs=results[0]if(len(responses)>0and"FindDescriptor"inresponses[0]and"entities"inresponses[0]["FindDescriptor"]):result_entities=responses[0]["FindDescriptor"]["entities"]# result_blobs = blobsforentinresult_entities:distance=round(ent["_distance"],10)txt_contents=ent["content"]forpinINVALID_DOC_METADATA_KEYS:ifpinent:delent[p]props={mkey:mvalformkey,mvalinent.items()ifmvalnotinINVALID_METADATA_VALUE}final_res.append((Document(page_content=txt_contents,metadata=props),distance,))exceptExceptionase:logger.warning(f"No results returned. Error while parsing results: {e}")returnfinal_res
[docs]defVDMS_Client(host:str="localhost",port:int=55555)->vdms.vdms:"""VDMS client for the VDMS server. Args: host: IP or hostname of VDMS server port: Port to connect to VDMS server """try:importvdmsexceptImportError:raiseImportError("Could not import vdms python package. ""Please install it with `pip install vdms.")client=vdms.vdms()client.connect(host,port)returnclient
[docs]@deprecated(since="0.3.18",removal="1.0.0",alternative_import="langchain_vdms.VDMS")classVDMS(VectorStore):"""Intel Lab's VDMS for vector-store workloads. To use, you should have both: - the ``vdms`` python package installed - a host (str) and port (int) associated with a deployed VDMS Server Visit https://github.com/IntelLabs/vdms/wiki more information. IT IS HIGHLY SUGGESTED TO NORMALIZE YOUR DATA. Args: client: VDMS Client used to connect to VDMS server collection_name: Name of data collection [Default: langchain] distance_strategy: Method used to calculate distances. VDMS supports "L2" (euclidean distance) or "IP" (inner product) [Default: L2] engine: Underlying implementation for indexing and computing distances. VDMS supports TileDBDense, TileDBSparse, FaissFlat, FaissIVFFlat, and Flinng [Default: FaissFlat] embedding: Any embedding function implementing `langchain_core.embeddings.Embeddings` interface. relevance_score_fn: Function for obtaining relevance score Example: .. code-block:: python from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores.vdms import VDMS, VDMS_Client model_name = "sentence-transformers/all-mpnet-base-v2" vectorstore = VDMS( client=VDMS_Client("localhost", 55555), embedding=HuggingFaceEmbeddings(model_name=model_name), collection_name="langchain-demo", distance_strategy="L2", engine="FaissFlat", ) """
@propertydefembeddings(self)->Optional[Embeddings]:returnself.embeddingdef_embed_documents(self,texts:List[str])->List[List[float]]:ifisinstance(self.embedding,Embeddings):returnself.embedding.embed_documents(texts)else:p_str="Must provide `embedding` which is expected"p_str+=" to be an Embeddings object"raiseValueError(p_str)def_embed_video(self,paths:List[str],**kwargs:Any)->List[List[float]]:ifself.embeddingisnotNoneandhasattr(self.embedding,"embed_video"):returnself.embedding.embed_video(paths=paths,**kwargs)else:raiseValueError("Must provide `embedding` which has attribute `embed_video`")def_embed_image(self,uris:List[str])->List[List[float]]:ifself.embeddingisnotNoneandhasattr(self.embedding,"embed_image"):returnself.embedding.embed_image(uris=uris)else:raiseValueError("Must provide `embedding` which has attribute `embed_image`")def_embed_query(self,text:str)->List[float]:ifisinstance(self.embedding,Embeddings):returnself.embedding.embed_query(text)else:raiseValueError("Must provide `embedding` which is expected to be an Embeddings object")def_select_relevance_score_fn(self)->Callable[[float],float]:""" The 'correct' relevance function may differ depending on a few things, including: - the distance / similarity metric used by the VectorStore - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - embedding dimensionality - etc. """ifself.override_relevance_score_fnisnotNone:returnself.override_relevance_score_fn# Default strategy is to rely on distance strategy provided# in vectorstore constructorifself.distance_strategy.lower()in["ip","l2"]:returnlambdax:xelse:raiseValueError("No supported normalization function"f" for distance_strategy of {self.distance_strategy}.""Consider providing relevance_score_fn to VDMS constructor.")def_similarity_search_with_relevance_scores(self,query:str,k:int=DEFAULT_K,fetch_k:int=DEFAULT_FETCH_K,filter:Optional[Dict[str,Any]]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs and their similarity scores on a scale from 0 to 1."""ifself.override_relevance_score_fnisNone:kwargs["normalize_distance"]=Truedocs_and_scores=self.similarity_search_with_score(query=query,k=k,fetch_k=fetch_k,filter=filter,**kwargs,)docs_and_rel_scores:List[Any]=[]fordoc,scoreindocs_and_scores:ifself.override_relevance_score_fnisNone:docs_and_rel_scores.append((doc,score))else:docs_and_rel_scores.append((doc,self.override_relevance_score_fn(score),))returndocs_and_rel_scores
[docs]defadd_set(self,collection_name:str,engine:ENGINES="FaissFlat",metric:DISTANCE_METRICS="L2",)->str:query=_add_descriptorset("AddDescriptorSet",collection_name,self.embedding_dimension,engine=getattr(engine,"value",engine),metric=getattr(metric,"value",metric),)response,_=self.__run_vdms_query([query])if"FailedCommand"inresponse[0]:raiseValueError(f"Failed to add collection {collection_name}")returncollection_name
def__delete(self,collection_name:str,ids:Union[None,List[str]]=None,constraints:Union[None,Dict[str,Any]]=None,)->bool:""" Deletes entire collection if id is not provided """all_queries:List[Any]=[]all_blobs:List[Any]=[]collection_properties=self.__get_properties(collection_name)results={"list":collection_properties}ifconstraintsisNone:constraints={"_deletion":["==",1]}else:constraints["_deletion"]=["==",1]ifidsisnotNone:constraints["id"]=["==",ids[0]]# if len(ids) > 1 else ids[0]]query=_add_descriptor("FindDescriptor",collection_name,label=None,ref=None,props=None,link=None,k_neighbors=None,constraints=constraints,results=results,)all_queries.append(query)response,response_array=self.__run_vdms_query(all_queries,all_blobs)# Update/store indices after deletionquery=_add_descriptorset("FindDescriptorSet",collection_name,storeIndex=True)responseSet,_=self.__run_vdms_query([query],all_blobs)return"FindDescriptor"inresponse[0]def__get_add_query(self,collection_name:str,metadata:Optional[Any]=None,embedding:Union[List[float],None]=None,document:Optional[Any]=None,id:Optional[str]=None,)->Tuple[Dict[str,Dict[str,Any]],Union[bytes,None]]:ifidisNone:props:Dict[str,Any]={}else:props={"id":id}id_exists,query=_check_descriptor_exists_by_id(self._client,collection_name,id)ifid_exists:skipped_value={prop_key:prop_val[-1]forprop_key,prop_valinquery["FindDescriptor"]["constraints"].items()}pstr=f"[!] Embedding with id ({id}) exists in DB;"pstr+="Therefore, skipped and not inserted"print(pstr)# noqa: T201print(f"\tSkipped values are: {skipped_value}")# noqa: T201returnquery,Noneifmetadata:props.update(metadata)ifdocumentnotin[None,""]:props["content"]=documentforkinprops.keys():ifknotinself.collection_properties:self.collection_properties.append(k)query=_add_descriptor("AddDescriptor",collection_name,label=None,ref=None,props=props,link=None,k_neighbors=None,constraints=None,results=None,)blob=embedding2bytes(embedding)return(query,blob,)def__get_properties(self,collection_name:str,unique_entity:Optional[bool]=False,deletion:Optional[bool]=False,)->List[str]:find_query=_find_property_entity(collection_name,unique_entity=unique_entity,deletion=deletion)response,response_blob=self.__run_vdms_query([find_query])iflen(response_blob)>0:collection_properties=_bytes2str(response_blob[0]).split(",")else:collection_properties=deepcopy(DEFAULT_PROPERTIES)returncollection_propertiesdef__run_vdms_query(self,all_queries:List[Dict],all_blobs:Optional[List]=[],print_last_response:Optional[bool]=False,)->Tuple[Any,Any]:response,response_array=self._client.query(all_queries,all_blobs)_=_check_valid_response(all_queries,response)ifprint_last_response:self._client.print_last_response()returnresponse,response_arraydef__update(self,collection_name:str,ids:List[str],documents:List[str],embeddings:List[List[float]],metadatas:Optional[Union[List[None],List[Dict[str,Any]]]]=None,)->None:""" Updates (find, delete, add) a collection based on id. If more than one collection returned with id, error occuers """_len_check_if_sized(ids,documents,"ids","documents")_len_check_if_sized(ids,embeddings,"ids","embeddings")metadatas=metadatasifmetadatasisnotNoneelse[Nonefor_inids]_len_check_if_sized(ids,metadatas,"ids","metadatas")orig_props=self.__get_properties(collection_name)updated_ids:List[Any]=[]formeta,emb,doc,idinzip(metadatas,embeddings,documents,ids):results={"list":self.collection_properties}constraints={"_deletion":["==",1]}ifidisnotNone:constraints["id"]=["==",id]query=_add_descriptor("FindDescriptor",collection_name,label=None,ref=None,props=None,link=None,k_neighbors=None,constraints=constraints,results=results,)response,response_array=self.__run_vdms_query([query])query,blob=self.__get_add_query(collection_name,metadata=meta,embedding=emb,document=doc,id=id,)ifblobisnotNone:response,response_array=self.__run_vdms_query([query],[blob])updated_ids.append(id)self.__update_properties(collection_name,orig_props,self.collection_properties)def__update_properties(self,collection_name:str,current_collection_properties:List,new_collection_properties:Optional[List],)->None:ifnew_collection_propertiesisnotNone:old_collection_properties=deepcopy(current_collection_properties)forpropinnew_collection_properties:ifpropnotincurrent_collection_properties:current_collection_properties.append(prop)ifcurrent_collection_properties!=old_collection_properties:all_queries,blob_arr=_build_property_query(collection_name,command_type="update",all_properties=current_collection_properties,)response,_=self.__run_vdms_query(all_queries,[blob_arr])
[docs]defadd_images(self,uris:List[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,batch_size:int=DEFAULT_INSERT_BATCH_SIZE,add_path:Optional[bool]=True,**kwargs:Any,)->List[str]:"""Run more images through the embeddings and add to the vectorstore. Images are added as embeddings (AddDescriptor) instead of separate entity (AddImage) within VDMS to leverage similarity search capability Args: uris: List of paths to the images to add to the vectorstore. metadatas: Optional list of metadatas associated with the images. ids: Optional list of unique IDs. batch_size (int): Number of concurrent requests to send to the server. add_path: Bool to add image path as metadata Returns: List of ids from adding images into the vectorstore. """# Map from uris to blobs to base64b64_texts=[self.encode_image(image_path=uri)foruriinuris]ifadd_pathandmetadatas:formidx,uriinenumerate(uris):metadatas[midx]["image_path"]=urielifadd_path:metadatas=[]foruriinuris:metadatas.append({"image_path":uri})# Populate IDsids=idsifidsisnotNoneelse[str(uuid.uuid4())for_inuris]# Set embeddingsembeddings=self._embed_image(uris=uris)ifmetadatasisNone:metadatas=[{}for_inuris]else:metadatas=[_validate_vdms_properties(m)forminmetadatas]self.add_from(texts=b64_texts,embeddings=embeddings,ids=ids,metadatas=metadatas,batch_size=batch_size,**kwargs,)returnids
[docs]defadd_videos(self,paths:List[str],texts:Optional[List[str]]=None,metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,batch_size:int=1,add_path:Optional[bool]=True,**kwargs:Any,)->List[str]:"""Run videos through the embeddings and add to the vectorstore. Videos are added as embeddings (AddDescriptor) instead of separate entity (AddVideo) within VDMS to leverage similarity search capability Args: paths: List of paths to the videos to add to the vectorstore. metadatas: Optional list of text associated with the videos. metadatas: Optional list of metadatas associated with the videos. ids: Optional list of unique IDs. batch_size (int): Number of concurrent requests to send to the server. add_path: Bool to add video path as metadata Returns: List of ids from adding videos into the vectorstore. """iftextsisNone:texts=[""for_inpaths]ifadd_pathandmetadatas:formidx,pathinenumerate(paths):metadatas[midx]["video_path"]=pathelifadd_path:metadatas=[]forpathinpaths:metadatas.append({"video_path":path})# Populate IDsids=idsifidsisnotNoneelse[str(uuid.uuid4())for_inpaths]# Set embeddingsembeddings=self._embed_video(paths=paths,**kwargs)ifmetadatasisNone:metadatas=[{}for_inpaths]self.add_from(texts=texts,embeddings=embeddings,ids=ids,metadatas=metadatas,batch_size=batch_size,**kwargs,)returnids
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,batch_size:int=DEFAULT_INSERT_BATCH_SIZE,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: List of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of unique IDs. batch_size (int): Number of concurrent requests to send to the server. Returns: List of ids from adding the texts into the vectorstore. """texts=list(texts)ifidsisNone:ids=[str(uuid.uuid4())for_intexts]embeddings=self._embed_documents(texts)ifmetadatasisNone:metadatas=[{}for_intexts]else:metadatas=[_validate_vdms_properties(m)forminmetadatas]inserted_ids=self.add_from(texts=texts,embeddings=embeddings,ids=ids,metadatas=metadatas,batch_size=batch_size,**kwargs,)returninserted_ids
[docs]defadd_from(self,texts:List[str],embeddings:List[List[float]],ids:List[str],metadatas:Optional[List[dict]]=None,batch_size:int=DEFAULT_INSERT_BATCH_SIZE,**kwargs:Any,)->List[str]:# Get initial propertiesorig_props=self.__get_properties(self._collection_name)inserted_ids:List[str]=[]forstart_idxinrange(0,len(texts),batch_size):end_idx=min(start_idx+batch_size,len(texts))batch_texts=texts[start_idx:end_idx]batch_embedding_vectors=embeddings[start_idx:end_idx]batch_ids=ids[start_idx:end_idx]ifmetadatas:batch_metadatas=metadatas[start_idx:end_idx]result=self.add(self._collection_name,embeddings=batch_embedding_vectors,texts=batch_texts,metadatas=batch_metadatas,ids=batch_ids,)inserted_ids.extend(result)# Update Propertiesself.__update_properties(self._collection_name,orig_props,self.collection_properties)returninserted_ids
def_check_required_inputs(self,collection_name:str,embedding_dimensions:Union[int,None])->None:# Check connection to clientifnotself._client.is_connected():raiseValueError("VDMS client must be connected to a VDMS server."+"Please use VDMS_Client to establish a connection")# Check Distance Metricifself.distance_strategynotinAVAILABLE_DISTANCE_METRICS:raiseValueError("distance_strategy must be either 'L2' or 'IP'")# Check Enginesifself.similarity_search_enginenotinAVAILABLE_ENGINES:raiseValueError("engine must be either 'TileDBDense', 'TileDBSparse', "+"'FaissFlat', 'FaissIVFFlat', or 'Flinng'")# Check Embedding Func is provided and store dimension sizeifself.embeddingisNone:raiseValueError("Must provide embedding function")ifembedding_dimensionsisnotNone:self.embedding_dimension=embedding_dimensionselifself.embeddingisnotNoneandhasattr(self.embedding,"embed_query"):self.embedding_dimension=len(self._embed_query("This is a sample sentence."))elifself.embeddingisnotNoneand(hasattr(self.embedding,"embed_image")orhasattr(self.embedding,"embed_video")):ifhasattr(self.embedding,"model"):try:self.embedding_dimension=(self.embedding.model.token_embedding.embedding_dim)exceptValueError:raiseValueError("Embedding dimension needed. Please define embedding_dimensions")else:raiseValueError("Embedding dimension needed. Please define embedding_dimensions")# Check for propertiescurrent_props=self.__get_properties(collection_name)ifhasattr(self,"collection_properties"):self.collection_properties.extend(current_props)else:self.collection_properties:List[str]=current_props
[docs]defdelete(self,ids:Optional[List[str]]=None,collection_name:Optional[str]=None,constraints:Optional[Dict]=None,**kwargs:Any,)->bool:"""Delete by ID. These are the IDs in the vectorstore. Args: ids: List of ids to delete. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """name=collection_nameifcollection_nameisnotNoneelseself._collection_namereturnself.__delete(name,ids=ids,constraints=constraints)
[docs]@classmethoddeffrom_documents(cls:Type[VDMS],documents:List[Document],embedding:Optional[Embeddings]=None,ids:Optional[List[str]]=None,batch_size:int=DEFAULT_INSERT_BATCH_SIZE,collection_name:str=DEFAULT_COLLECTION_NAME,# Add this line**kwargs:Any,)->VDMS:"""Create a VDMS vectorstore from a list of documents. Args: collection_name (str): Name of the collection to create. documents (List[Document]): List of documents to add to vectorstore. embedding (Embeddings): Embedding function. Defaults to None. ids (Optional[List[str]]): List of document IDs. Defaults to None. batch_size (int): Number of concurrent requests to send to the server. Returns: VDMS: VDMS vectorstore. """client:vdms.vdms=kwargs["client"]returncls.from_texts(client=client,texts=[doc.page_contentfordocindocuments],metadatas=[doc.metadatafordocindocuments],embedding=embedding,ids=ids,batch_size=batch_size,collection_name=collection_name,# **kwargs,)
[docs]@classmethoddeffrom_texts(cls:Type[VDMS],texts:List[str],embedding:Optional[Embeddings]=None,metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,batch_size:int=DEFAULT_INSERT_BATCH_SIZE,collection_name:str=DEFAULT_COLLECTION_NAME,**kwargs:Any,)->VDMS:"""Create a VDMS vectorstore from a raw documents. Args: texts (List[str]): List of texts to add to the collection. embedding (Embeddings): Embedding function. Defaults to None. metadatas (Optional[List[dict]]): List of metadatas. Defaults to None. ids (Optional[List[str]]): List of document IDs. Defaults to None. batch_size (int): Number of concurrent requests to send to the server. collection_name (str): Name of the collection to create. Returns: VDMS: VDMS vectorstore. """client:vdms.vdms=kwargs["client"]vdms_collection=cls(collection_name=collection_name,embedding=embedding,client=client,# **kwargs,)ifidsisNone:ids=[str(uuid.uuid4())for_intexts]vdms_collection.add_texts(texts=texts,metadatas=metadatas,ids=ids,batch_size=batch_size,# **kwargs)returnvdms_collection
[docs]defget(self,collection_name:str,constraints:Optional[Dict]=None,limit:Optional[int]=None,include:List[str]=["metadata"],)->Tuple[Any,Any]:"""Gets the collection. Get embeddings and their associated data from the data store. If no constraints provided returns all embeddings up to limit. Args: constraints: A dict used to filter results by. E.g. `{"color" : ["==", "red"], "price": [">", 4.00]}`. Optional. limit: The number of documents to return. Optional. include: A list of what to include in the results. Can contain `"embeddings"`, `"metadatas"`, `"documents"`. Ids are always included. Defaults to `["metadatas", "documents"]`. Optional. """all_queries:List[Any]=[]all_blobs:List[Any]=[]results:Dict[str,Any]={"count":""}iflimitisnotNone:results["limit"]=limit# Include metadataif"metadata"ininclude:collection_properties=self.__get_properties(collection_name)results["list"]=collection_properties# Include embeddingif"embeddings"ininclude:results["blob"]=Truequery=_add_descriptor("FindDescriptor",collection_name,k_neighbors=None,constraints=constraints,results=results,)all_queries.append(query)response,response_array=self.__run_vdms_query(all_queries,all_blobs)returnresponse,response_array
[docs]defmax_marginal_relevance_search(self,query:str,k:int=DEFAULT_K,fetch_k:int=DEFAULT_FETCH_K,lambda_mult:float=0.5,filter:Optional[Dict[str,List]]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query (str): Query to look up. Text or path for image or video. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents selected by maximal marginal relevance. """ifself.embeddingisNone:raiseValueError("For MMR search, you must specify an embedding function oncreation.")# embedding_vector: List[float] = self._embed_query(query)embedding_vector:List[float]ifnotos.path.isfile(query)andhasattr(self.embedding,"embed_query"):embedding_vector=self._embed_query(query)elifos.path.isfile(query)andhasattr(self.embedding,"embed_image"):embedding_vector=self._embed_image(uris=[query])[0]elifos.path.isfile(query)andhasattr(self.embedding,"embed_video"):embedding_vector=self._embed_video(paths=[query])[0]else:error_msg=f"Could not generate embedding for query '{query}'."error_msg+="If using path for image or video, verify embedding model "error_msg+="has callable functions 'embed_image' or 'embed_video'."raiseValueError(error_msg)docs=self.max_marginal_relevance_search_by_vector(embedding_vector,k,fetch_k,lambda_mult=lambda_mult,filter=filter,)returndocs
[docs]defmax_marginal_relevance_search_by_vector(self,embedding:List[float],k:int=DEFAULT_K,fetch_k:int=DEFAULT_FETCH_K,lambda_mult:float=0.5,filter:Optional[Dict[str,List]]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents selected by maximal marginal relevance. """results=self.query_collection_embeddings(query_embeddings=[embedding],n_results=fetch_k,filter=filter,include=["metadatas","documents","distances","embeddings"],)iflen(results[0][1])==0:# No results returnedreturn[]else:embedding_list=[list(_bytes2embedding(result))forresultinresults[0][1]]mmr_selected=maximal_marginal_relevance(np.array(embedding,dtype=np.float32),embedding_list,k=k,lambda_mult=lambda_mult,)candidates=_results_to_docs(results)selected_results=[rfori,rinenumerate(candidates)ifiinmmr_selected]returnselected_results
[docs]defmax_marginal_relevance_search_with_score(self,query:str,k:int=DEFAULT_K,fetch_k:int=DEFAULT_FETCH_K,lambda_mult:float=0.5,filter:Optional[Dict[str,List]]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query (str): Query to look up. Text or path for image or video. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents selected by maximal marginal relevance. """ifself.embeddingisNone:raiseValueError("For MMR search, you must specify an embedding function oncreation.")ifnotos.path.isfile(query)andhasattr(self.embedding,"embed_query"):embedding=self._embed_query(query)elifos.path.isfile(query)andhasattr(self.embedding,"embed_image"):embedding=self._embed_image(uris=[query])[0]elifos.path.isfile(query)andhasattr(self.embedding,"embed_video"):embedding=self._embed_video(paths=[query])[0]else:error_msg=f"Could not generate embedding for query '{query}'."error_msg+="If using path for image or video, verify embedding model "error_msg+="has callable functions 'embed_image' or 'embed_video'."raiseValueError(error_msg)docs=self.max_marginal_relevance_search_with_score_by_vector(embedding,k,fetch_k,lambda_mult=lambda_mult,filter=filter,)returndocs
[docs]defmax_marginal_relevance_search_with_score_by_vector(self,embedding:List[float],k:int=DEFAULT_K,fetch_k:int=DEFAULT_FETCH_K,lambda_mult:float=0.5,filter:Optional[Dict[str,List]]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents selected by maximal marginal relevance. """results=self.query_collection_embeddings(query_embeddings=[embedding],n_results=fetch_k,filter=filter,include=["metadatas","documents","distances","embeddings"],)iflen(results[0][1])==0:# No results returnedreturn[]else:embedding_list=[list(_bytes2embedding(result))forresultinresults[0][1]]mmr_selected=maximal_marginal_relevance(np.array(embedding,dtype=np.float32),embedding_list,k=k,lambda_mult=lambda_mult,)candidates=_results_to_docs_and_scores(results)selected_results=[(r,s)fori,(r,s)inenumerate(candidates)ifiinmmr_selected]returnselected_results
[docs]defsimilarity_search(self,query:str,k:int=DEFAULT_K,fetch_k:int=DEFAULT_FETCH_K,filter:Optional[Dict[str,List]]=None,**kwargs:Any,)->List[Document]:"""Run similarity search with VDMS. Args: query (str): Query to look up. Text or path for image or video. k (int): Number of results to return. Defaults to 3. fetch_k (int): Number of candidates to fetch for knn (>= k). filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Document]: List of documents most similar to the query text. """docs_and_scores=self.similarity_search_with_score(query,k=k,fetch_k=fetch_k,filter=filter,**kwargs)return[docfordoc,_indocs_and_scores]
[docs]defsimilarity_search_by_vector(self,embedding:List[float],k:int=DEFAULT_K,fetch_k:int=DEFAULT_FETCH_K,filter:Optional[Dict[str,List]]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to embedding vector. Args: embedding (List[float]): Embedding to look up documents similar to. k (int): Number of Documents to return. Defaults to 3. fetch_k (int): Number of candidates to fetch for knn (>= k). filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query vector. """results=self.query_collection_embeddings(query_embeddings=[embedding],n_results=k,fetch_k=fetch_k,filter=filter,**kwargs,)return_results_to_docs(results)
[docs]defsimilarity_search_with_score(self,query:str,k:int=DEFAULT_K,fetch_k:int=DEFAULT_FETCH_K,filter:Optional[Dict[str,List]]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Run similarity search with VDMS with distance. Args: query (str): Query to look up. Text or path for image or video. k (int): Number of results to return. Defaults to 3. fetch_k (int): Number of candidates to fetch for knn (>= k). filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Tuple[Document, float]]: List of documents most similar to the query text and cosine distance in float for each. Lower score represents more similarity. """ifself.embeddingisNone:raiseValueError("Must provide embedding function")else:ifnotos.path.isfile(query)andhasattr(self.embedding,"embed_query"):query_embedding:List[float]=self._embed_query(query)elifos.path.isfile(query)andhasattr(self.embedding,"embed_image"):query_embedding=self._embed_image(uris=[query])[0]elifos.path.isfile(query)andhasattr(self.embedding,"embed_video"):query_embedding=self._embed_video(paths=[query])[0]else:error_msg=f"Could not generate embedding for query '{query}'."error_msg+="If using path for image or video, verify embedding model "error_msg+="has callable functions 'embed_image' or 'embed_video'."raiseValueError(error_msg)results=self.query_collection_embeddings(query_embeddings=[query_embedding],n_results=k,fetch_k=fetch_k,filter=filter,**kwargs,)return_results_to_docs_and_scores(results)
[docs]defsimilarity_search_with_score_by_vector(self,embedding:List[float],k:int=DEFAULT_K,fetch_k:int=DEFAULT_FETCH_K,filter:Optional[Dict[str,List]]=None,**kwargs:Any,)->List[Tuple[Document,float]]:""" Return docs most similar to embedding vector and similarity score. Args: embedding (List[float]): Embedding to look up documents similar to. k (int): Number of Documents to return. Defaults to 3. fetch_k (int): Number of candidates to fetch for knn (>= k). filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Tuple[Document, float]]: List of documents most similar to the query text. Lower score represents more similarity. """# kwargs["normalize_distance"] = Trueresults=self.query_collection_embeddings(query_embeddings=[embedding],n_results=k,fetch_k=fetch_k,filter=filter,**kwargs,)return_results_to_docs_and_scores(results)
[docs]defupdate_document(self,collection_name:str,document_id:str,document:Document)->None:"""Update a document in the collection. Args: document_id (str): ID of the document to update. document (Document): Document to update. """returnself.update_documents(collection_name,[document_id],[document])
[docs]defupdate_documents(self,collection_name:str,ids:List[str],documents:List[Document])->None:"""Update a document in the collection. Args: ids (List[str]): List of ids of the document to update. documents (List[Document]): List of documents to update. """text=[document.page_contentfordocumentindocuments]metadata=[_validate_vdms_properties(document.metadata)fordocumentindocuments]embeddings=self._embed_documents(text)self.__update(collection_name,ids,metadatas=metadata,embeddings=embeddings,documents=text,)
[docs]defembedding2bytes(embedding:Union[List[float],None])->Union[bytes,None]:"""Convert embedding to bytes."""blob=NoneifembeddingisnotNone:emb=np.array(embedding,dtype="float32")blob=emb.tobytes()returnblob