Source code for langchain_google_vertexai.vectorstores.vectorstores
importuuidimportwarningsfromtypingimportAny,Dict,Iterable,List,Optional,Tuple,Type,Unionfromgoogle.cloud.aiplatform.matching_engine.matching_engine_index_endpointimport(Namespace,NumericNamespace,)fromgoogle.oauth2.service_accountimportCredentialsfromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.vectorstoresimportVectorStorefromlangchain_google_vertexai.vectorstores._sdk_managerimportVectorSearchSDKManagerfromlangchain_google_vertexai.vectorstores._searcherimport(Searcher,VectorSearchSearcher,)fromlangchain_google_vertexai.vectorstores.document_storageimport(DataStoreDocumentStorage,DocumentStorage,GCSDocumentStorage,)class_BaseVertexAIVectorStore(VectorStore):"""Represents a base vector store based on VertexAI."""def__init__(self,searcher:Searcher,document_storage:DocumentStorage,embbedings:Optional[Embeddings]=None,)->None:"""Constructor. Args: searcher: Object in charge of searching and storing the index. document_storage: Object in charge of storing and retrieving documents. embbedings: Object in charge of transforming text to embbeddings. """super().__init__()self._searcher=searcherself._document_storage=document_storageself._embeddings=embbedingsorself._get_default_embeddings()@propertydefembbedings(self)->Embeddings:"""Returns the embeddings object."""returnself._embeddingsdefsimilarity_search_with_score(# type: ignore[override]self,query:str,k:int=4,filter:Optional[List[Namespace]]=None,numeric_filter:Optional[List[NumericNamespace]]=None,)->List[Tuple[Document,Union[float,Dict[str,float]]]]:"""Return docs most similar to query and their cosine distance from the query. Args: query: String query look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional. A list of Namespaces for filtering the matching results. For example: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] will match datapoints that satisfy "red color" but not include datapoints with "squared shape". Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. numeric_filter: Optional. A list of NumericNamespaces for filterning the matching results. Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. Returns: List[Tuple[Document, float]]: List of documents most similar to the query text and cosine distance in float for each. Higher score represents more similarity. """embedding=self._embeddings.embed_query(query)returnself.similarity_search_by_vector_with_score(embedding=embedding,k=k,filter=filter,numeric_filter=numeric_filter)defsimilarity_search_by_vector_with_score(self,embedding:List[float],sparse_embedding:Optional[Dict[str,Union[List[int],List[float]]]]=None,k:int=4,rrf_ranking_alpha:float=1,filter:Optional[List[Namespace]]=None,numeric_filter:Optional[List[NumericNamespace]]=None,)->List[Tuple[Document,Union[float,Dict[str,float]]]]:"""Return docs most similar to the embedding and their cosine distance. Args: embedding: Embedding to look up documents similar to. sparse_embedding: Sparse embedding dictionary which represents an embedding as a list of dimensions and as a list of sparse values: ie. {"values": [0.7, 0.5], "dimensions": [10, 20]} k: Number of Documents to return. Defaults to 4. rrf_ranking_alpha: Reciprocal Ranking Fusion weight, float between 0 and 1.0 Weights Dense Search VS Sparse Search, as an example: - rrf_ranking_alpha=1: Only Dense - rrf_ranking_alpha=0: Only Sparse - rrf_ranking_alpha=0.7: 0.7 weighting for dense and 0.3 for sparse filter: Optional. A list of Namespaces for filtering the matching results. For example: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] will match datapoints that satisfy "red color" but not include datapoints with "squared shape". Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. numeric_filter: Optional. A list of NumericNamespaces for filterning the matching results. Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. Returns: List[Tuple[Document, Union[float, Dict[str, float]]]]: List of documents most similar to the query text and either cosine distance in float for each or dictionary with both dense and sparse scores if running hybrid search. Higher score represents more similarity. """ifsparse_embeddingisnotNoneandnotisinstance(sparse_embedding,dict):raiseValueError("`sparse_embedding` should be a dictionary with the following format: ""{'values': [0.7, 0.5, ...], 'dimensions': [10, 20, ...]}\n"f"{type(sparse_embedding)} != {type({})}")sparse_embeddings=[sparse_embedding]ifsparse_embeddingisnotNoneelseNoneneighbors_list=self._searcher.find_neighbors(embeddings=[embedding],sparse_embeddings=sparse_embeddings,k=k,rrf_ranking_alpha=rrf_ranking_alpha,filter_=filter,numeric_filter=numeric_filter,)ifnotneighbors_list:return[]keys=[elem["doc_id"]foreleminneighbors_list[0]]ifsparse_embeddingisNone:distances=[elem["dense_score"]foreleminneighbors_list[0]]else:distances=[{"dense_score":elem["dense_score"],"sparse_score":elem["sparse_score"],}foreleminneighbors_list[0]]documents=self._document_storage.mget(keys)ifall(documentisnotNonefordocumentindocuments):# Ignore typing because mypy doesn't seem to be able to identify that# in documents there is no possibility to have None values with the# check above.returnlist(zip(documents,distances))# type: ignoreelse:missing_docs=[keyforkey,docinzip(keys,documents)ifdocisNone]message=f"Documents with ids: {missing_docs} not found in the storage"raiseValueError(message)defdelete(self,ids:Optional[List[str]]=None,**kwargs:Any)->Optional[bool]:""" Delete by vector ID. Args: ids (Optional[List[str]]): List of ids to delete. **kwargs (Any): If added metadata={}, deletes the documents that match the metadata filter and the parameter ids is not needed. Returns: Optional[bool]: True if deletion is successful. Raises: ValueError: If ids is None or an empty list. RuntimeError: If an error occurs during the deletion process. """metadata=kwargs.get("metadata")if(notidsandnotmetadata)or(idsandmetadata):raiseValueError("You should provide ids (as list of id's) or a metadata""filter for deleting documents.")ifmetadata:ids=self._searcher.get_datapoints_by_filter(metadata=metadata)ifnotids:returnFalsetry:self._searcher.remove_datapoints(datapoint_ids=ids)# type: ignore[arg-type]self._document_storage.mdelete(ids)# type: ignore[arg-type]returnTrueexceptExceptionase:raiseRuntimeError(f"Error during deletion: {str(e)}")fromedefsimilarity_search(self,query:str,k:int=4,filter:Optional[List[Namespace]]=None,numeric_filter:Optional[List[NumericNamespace]]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to query. Args: query: The string that will be used to search for similar documents. k: The amount of neighbors that will be retrieved. filter: Optional. A list of Namespaces for filtering the matching results. For example: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] will match datapoints that satisfy "red color" but not include datapoints with "squared shape". Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. numeric_filter: Optional. A list of NumericNamespaces for filterning the matching results. Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. Returns: A list of k matching documents. """return[documentfordocument,_inself.similarity_search_with_score(query,k,filter,numeric_filter)]defadd_texts(self,texts:Iterable[str],metadatas:Union[List[dict],None]=None,*,ids:Optional[List[str]]=None,is_complete_overwrite:bool=False,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of ids to be assigned to the texts in the index. If None, unique ids will be generated. is_complete_overwrite: Optional, determines whether this is an append or overwrite operation. Only relevant for BATCH UPDATE indexes. kwargs: vectorstore specific parameters. Returns: List of ids from adding the texts into the vectorstore. """# Makes sure is a list and can get the length, should we support iterables?# metadata is a list so probably not?texts=list(texts)embeddings=self._embeddings.embed_documents(texts)returnself.add_texts_with_embeddings(texts=texts,embeddings=embeddings,metadatas=metadatas,ids=ids,is_complete_overwrite=is_complete_overwrite,**kwargs,)defadd_texts_with_embeddings(self,texts:List[str],embeddings:List[List[float]],metadatas:Union[List[dict],None]=None,*,sparse_embeddings:Optional[List[Dict[str,Union[List[int],List[float]]]]]=None,ids:Optional[List[str]]=None,is_complete_overwrite:bool=False,**kwargs:Any,)->List[str]:ifidsisnotNoneandlen(set(ids))!=len(ids):raiseValueError("All provided ids should be unique."f"There are {len(ids)-len(set(ids))} duplicates.")ifidsisnotNoneandlen(ids)!=len(texts):raiseValueError("The number of `ids` should match the number of `texts` "f"{len(ids)} != {len(texts)}")ifisinstance(embeddings,list)andlen(embeddings)!=len(texts):raiseValueError("The number of `embeddings` should match the number of `texts` "f"{len(embeddings)} != {len(texts)}")ifidsisNone:ids=self._generate_unique_ids(len(texts))ifmetadatasisNone:metadatas=[{}]*len(texts)iflen(metadatas)!=len(texts):raiseValueError("`metadatas` should be the same length as `texts` "f"{len(metadatas)} != {len(texts)}")documents=[Document(page_content=text,metadata=metadata)fortext,metadatainzip(texts,metadatas)]self._document_storage.mset(list(zip(ids,documents)))self._searcher.add_to_index(ids=ids,embeddings=embeddings,sparse_embeddings=sparse_embeddings,metadatas=metadatas,is_complete_overwrite=is_complete_overwrite,**kwargs,)returnids@classmethoddeffrom_texts(cls:Type["_BaseVertexAIVectorStore"],texts:List[str],embedding:Embeddings,metadatas:Union[List[dict],None]=None,**kwargs:Any,)->"_BaseVertexAIVectorStore":"""Use from components instead."""raiseNotImplementedError("This method is not implemented. Instead, you should initialize the class"" with `VertexAIVectorSearch.from_components(...)` and then call ""`add_texts`")@classmethoddef_get_default_embeddings(cls)->Embeddings:"""This function returns the default embedding. Returns: Default TensorflowHubEmbeddings to use. """warnings.warn(message=("`TensorflowHubEmbeddings` as a default embbedings is deprecated."" Will change to `VertexAIEmbbedings`. Please specify the embedding ""type in the constructor."),category=DeprecationWarning,)# TODO: Change to vertexai embbedingssfromlangchain_community.embeddingsimport(# type: ignore[import-not-found, unused-ignore]TensorflowHubEmbeddings,)returnTensorflowHubEmbeddings()def_generate_unique_ids(self,number:int)->List[str]:"""Generates a list of unique ids of length `number` Args: number: Number of ids to generate. Returns: List of unique ids. """return[str(uuid.uuid4())for_inrange(number)]
[docs]classVectorSearchVectorStore(_BaseVertexAIVectorStore):"""VertexAI VectorStore that handles the search and indexing using Vector Search and stores the documents in Google Cloud Storage. """
[docs]@classmethoddeffrom_components(# Implemented in order to keep the current APIcls:Type["VectorSearchVectorStore"],project_id:str,region:str,gcs_bucket_name:str,index_id:str,endpoint_id:str,private_service_connect_ip_address:Optional[str]=None,credentials:Optional[Credentials]=None,credentials_path:Optional[str]=None,embedding:Optional[Embeddings]=None,stream_update:bool=False,**kwargs:Any,)->"VectorSearchVectorStore":"""Takes the object creation out of the constructor. Args: project_id: The GCP project id. region: The default location making the API calls. It must have the same location as the GCS bucket and must be regional. gcs_bucket_name: The location where the vectors will be stored in order for the index to be created. index_id: The id of the created index. endpoint_id: The id of the created endpoint. private_service_connect_ip_address: The IP address of the private service connect instance. credentials: Google cloud Credentials object. credentials_path: (Optional) The path of the Google credentials on the local file system. embedding: The :class:`Embeddings` that will be used for embedding the texts. stream_update: Whether to update with streaming or batching. VectorSearch index must be compatible with stream/batch updates. kwargs: Additional keyword arguments to pass to VertexAIVectorSearch.__init__(). Returns: A configured VertexAIVectorSearch. """sdk_manager=VectorSearchSDKManager(project_id=project_id,region=region,credentials=credentials,credentials_path=credentials_path,)bucket=sdk_manager.get_gcs_bucket(bucket_name=gcs_bucket_name)index=sdk_manager.get_index(index_id=index_id)endpoint=sdk_manager.get_endpoint(endpoint_id=endpoint_id)ifprivate_service_connect_ip_address:endpoint.private_service_connect_ip_address=(private_service_connect_ip_address)returncls(document_storage=GCSDocumentStorage(bucket=bucket),searcher=VectorSearchSearcher(endpoint=endpoint,index=index,staging_bucket=bucket,stream_update=stream_update,),embbedings=embedding,)
[docs]classVectorSearchVectorStoreGCS(VectorSearchVectorStore):"""Alias of `VectorSearchVectorStore` for consistency with the rest of vector stores with different document storage backends. """
[docs]classVectorSearchVectorStoreDatastore(_BaseVertexAIVectorStore):"""VectorSearch with DatasTore document storage."""
[docs]@classmethoddeffrom_components(cls:Type["VectorSearchVectorStoreDatastore"],project_id:str,region:str,index_id:str,endpoint_id:str,index_staging_bucket_name:Optional[str]=None,credentials:Optional[Credentials]=None,credentials_path:Optional[str]=None,embedding:Optional[Embeddings]=None,stream_update:bool=False,datastore_client_kwargs:Optional[Dict[str,Any]]=None,exclude_from_indexes:Optional[List[str]]=None,datastore_kind:str="document_id",datastore_text_property_name:str="text",datastore_metadata_property_name:str="metadata",**kwargs:Dict[str,Any],)->"VectorSearchVectorStoreDatastore":"""Takes the object creation out of the constructor. Args: project_id: The GCP project id. region: The default location making the API calls. It must have the same location as the GCS bucket and must be regional. index_id: The id of the created index. endpoint_id: The id of the created endpoint. index_staging_bucket_name: (Optional) If the index is updated by batch, bucket where the data will be staged before updating the index. Only required when updating the index. credentials: Google cloud Credentials object. credentials_path: (Optional) The path of the Google credentials on the local file system. embedding: The :class:`Embeddings` that will be used for embedding the texts. stream_update: Whether to update with streaming or batching. VectorSearch index must be compatible with stream/batch updates. kwargs: Additional keyword arguments to pass to VertexAIVectorSearch.__init__(). exclude_from_indexes: Fields to exclude from datastore indexing Returns: A configured VectorSearchVectorStoreDatastore. """sdk_manager=VectorSearchSDKManager(project_id=project_id,region=region,credentials=credentials,credentials_path=credentials_path,)ifindex_staging_bucket_nameisnotNone:bucket=sdk_manager.get_gcs_bucket(bucket_name=index_staging_bucket_name)else:bucket=Noneindex=sdk_manager.get_index(index_id=index_id)endpoint=sdk_manager.get_endpoint(endpoint_id=endpoint_id)ifdatastore_client_kwargsisNone:datastore_client_kwargs={}datastore_client=sdk_manager.get_datastore_client(**datastore_client_kwargs)ifexclude_from_indexesisNone:exclude_from_indexes=[]document_storage=DataStoreDocumentStorage(datastore_client=datastore_client,kind=datastore_kind,text_property_name=datastore_text_property_name,metadata_property_name=datastore_metadata_property_name,exclude_from_indexes=exclude_from_indexes,)returncls(document_storage=document_storage,searcher=VectorSearchSearcher(endpoint=endpoint,index=index,staging_bucket=bucket,stream_update=stream_update,),embbedings=embedding,)