Source code for langchain_google_vertexai.vectorstores.vectorstores
importuuidimportwarningsfromtypingimportAny,Dict,Iterable,List,Optional,Tuple,Type,Unionfromgoogle.cloud.aiplatform.matching_engine.matching_engine_index_endpointimport(Namespace,NumericNamespace,)fromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.vectorstoresimportVectorStorefromlangchain_google_vertexai.vectorstores._sdk_managerimportVectorSearchSDKManagerfromlangchain_google_vertexai.vectorstores._searcherimport(Searcher,VectorSearchSearcher,)fromlangchain_google_vertexai.vectorstores.document_storageimport(DataStoreDocumentStorage,DocumentStorage,GCSDocumentStorage,)class_BaseVertexAIVectorStore(VectorStore):"""Represents a base vector store based on VertexAI."""def__init__(self,searcher:Searcher,document_storage:DocumentStorage,embbedings:Optional[Embeddings]=None,)->None:"""Constructor. Args: searcher: Object in charge of searching and storing the index. document_storage: Object in charge of storing and retrieving documents. embbedings: Object in charge of transforming text to embbeddings. """super().__init__()self._searcher=searcherself._document_storage=document_storageself._embeddings=embbedingsorself._get_default_embeddings()@propertydefembbedings(self)->Embeddings:"""Returns the embeddings object."""returnself._embeddingsdefsimilarity_search_with_score(self,query:str,k:int=4,filter:Optional[List[Namespace]]=None,numeric_filter:Optional[List[NumericNamespace]]=None,)->List[Tuple[Document,float]]:"""Return docs most similar to query and their cosine distance from the query. Args: query: String query look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional. A list of Namespaces for filtering the matching results. For example: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] will match datapoints that satisfy "red color" but not include datapoints with "squared shape". Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. numeric_filter: Optional. A list of NumericNamespaces for filterning the matching results. Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. Returns: List[Tuple[Document, float]]: List of documents most similar to the query text and cosine distance in float for each. Lower score represents more similarity. """embbedings=self._embeddings.embed_query(query)returnself.similarity_search_by_vector_with_score(embedding=embbedings,k=k,filter=filter,numeric_filter=numeric_filter)defsimilarity_search_by_vector_with_score(self,embedding:List[float],k:int=4,filter:Optional[List[Namespace]]=None,numeric_filter:Optional[List[NumericNamespace]]=None,)->List[Tuple[Document,float]]:"""Return docs most similar to the embedding and their cosine distance. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional. A list of Namespaces for filtering the matching results. For example: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] will match datapoints that satisfy "red color" but not include datapoints with "squared shape". Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. numeric_filter: Optional. A list of NumericNamespaces for filterning the matching results. Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. Returns: List[Tuple[Document, float]]: List of documents most similar to the query text and cosine distance in float for each. Lower score represents more similarity. """neighbors_list=self._searcher.find_neighbors(embeddings=[embedding],k=k,filter_=filter,numeric_filter=numeric_filter)keys=[keyforkey,_inneighbors_list[0]]distances=[distancefor_,distanceinneighbors_list[0]]documents=self._document_storage.mget(keys)ifall(documentisnotNonefordocumentindocuments):# Ignore typing because mypy doesn't seem to be able to identify that# in documents there is no possibility to have None values with the# check above.returnlist(zip(documents,distances))# type: ignoreelse:missing_docs=[keyforkey,docinzip(keys,documents)ifdocisNone]message=f"Documents with ids: {missing_docs} not found in the storage"raiseValueError(message)defsimilarity_search(self,query:str,k:int=4,filter:Optional[List[Namespace]]=None,numeric_filter:Optional[List[NumericNamespace]]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to query. Args: query: The string that will be used to search for similar documents. k: The amount of neighbors that will be retrieved. filter: Optional. A list of Namespaces for filtering the matching results. For example: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] will match datapoints that satisfy "red color" but not include datapoints with "squared shape". Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. numeric_filter: Optional. A list of NumericNamespaces for filterning the matching results. Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. Returns: A list of k matching documents. """return[documentfordocument,_inself.similarity_search_with_score(query,k,filter,numeric_filter)]defadd_texts(self,texts:Iterable[str],metadatas:Union[List[dict],None]=None,*,ids:Optional[List[str]]=None,is_complete_overwrite:bool=False,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of ids to be assigned to the texts in the index. If None, unique ids will be generated. is_complete_overwrite: Optional, determines whether this is an append or overwrite operation. Only relevant for BATCH UPDATE indexes. kwargs: vectorstore specific parameters. Returns: List of ids from adding the texts into the vectorstore. """# Makes sure is a list and can get the length, should we support iterables?# metadata is a list so probably not?texts=list(texts)ifidsisnotNoneandlen(set(ids))!=len(ids):raiseValueError("All provided ids should be unique."f"There are {len(ids)-len(set(ids))} duplicates.")ifidsisnotNoneandlen(ids)!=len(texts):raiseValueError("The number of `ids` should match the number of `texts` "f"{len(ids)} != {len(texts)}")ifidsisNone:ids=self._generate_unique_ids(len(texts))ifmetadatasisNone:metadatas=[{}]*len(texts)iflen(metadatas)!=len(texts):raiseValueError("`metadatas` should be the same length as `texts` "f"{len(metadatas)} != {len(texts)}")documents=[Document(page_content=text,metadata=metadata)fortext,metadatainzip(texts,metadatas)]self._document_storage.mset(list(zip(ids,documents)))embeddings=self._embeddings.embed_documents(texts)self._searcher.add_to_index(ids,embeddings,metadatas,is_complete_overwrite,**kwargs)returnids@classmethoddeffrom_texts(cls:Type["_BaseVertexAIVectorStore"],texts:List[str],embedding:Embeddings,metadatas:Union[List[dict],None]=None,**kwargs:Any,)->"_BaseVertexAIVectorStore":"""Use from components instead."""raiseNotImplementedError("This method is not implemented. Instead, you should initialize the class"" with `VertexAIVectorSearch.from_components(...)` and then call ""`add_texts`")@classmethoddef_get_default_embeddings(cls)->Embeddings:"""This function returns the default embedding. Returns: Default TensorflowHubEmbeddings to use. """warnings.warn(message=("`TensorflowHubEmbeddings` as a default embbedings is deprecated."" Will change to `VertexAIEmbbedings`. Please specify the embedding ""type in the constructor."),category=DeprecationWarning,)# TODO: Change to vertexai embbedingssfromlangchain_community.embeddingsimport(# type: ignore[import-not-found, unused-ignore]TensorflowHubEmbeddings,)returnTensorflowHubEmbeddings()def_generate_unique_ids(self,number:int)->List[str]:"""Generates a list of unique ids of length `number` Args: number: Number of ids to generate. Returns: List of unique ids. """return[str(uuid.uuid4())for_inrange(number)]
[docs]classVectorSearchVectorStore(_BaseVertexAIVectorStore):"""VertexAI VectorStore that handles the search and indexing using Vector Search and stores the documents in Google Cloud Storage. """
[docs]@classmethoddeffrom_components(# Implemented in order to keep the current APIcls:Type["VectorSearchVectorStore"],project_id:str,region:str,gcs_bucket_name:str,index_id:str,endpoint_id:str,private_service_connect_ip_address:Optional[str]=None,credentials_path:Optional[str]=None,embedding:Optional[Embeddings]=None,stream_update:bool=False,**kwargs:Any,)->"VectorSearchVectorStore":"""Takes the object creation out of the constructor. Args: project_id: The GCP project id. region: The default location making the API calls. It must have the same location as the GCS bucket and must be regional. gcs_bucket_name: The location where the vectors will be stored in order for the index to be created. index_id: The id of the created index. endpoint_id: The id of the created endpoint. private_service_connect_ip_address: The IP address of the private service connect instance. credentials_path: (Optional) The path of the Google credentials on the local file system. embedding: The :class:`Embeddings` that will be used for embedding the texts. stream_update: Whether to update with streaming or batching. VectorSearch index must be compatible with stream/batch updates. kwargs: Additional keyword arguments to pass to VertexAIVectorSearch.__init__(). Returns: A configured VertexAIVectorSearch. """sdk_manager=VectorSearchSDKManager(project_id=project_id,region=region,credentials_path=credentials_path)bucket=sdk_manager.get_gcs_bucket(bucket_name=gcs_bucket_name)index=sdk_manager.get_index(index_id=index_id)endpoint=sdk_manager.get_endpoint(endpoint_id=endpoint_id)ifprivate_service_connect_ip_address:endpoint.private_service_connect_ip_address=(private_service_connect_ip_address)returncls(document_storage=GCSDocumentStorage(bucket=bucket),searcher=VectorSearchSearcher(endpoint=endpoint,index=index,staging_bucket=bucket,stream_update=stream_update,),embbedings=embedding,)
[docs]classVectorSearchVectorStoreGCS(VectorSearchVectorStore):"""Alias of `VectorSearchVectorStore` for consistency with the rest of vector stores with different document storage backends. """
[docs]classVectorSearchVectorStoreDatastore(_BaseVertexAIVectorStore):"""VectorSearch with DatasTore document storage."""
[docs]@classmethoddeffrom_components(cls:Type["VectorSearchVectorStoreDatastore"],project_id:str,region:str,index_id:str,endpoint_id:str,index_staging_bucket_name:Optional[str]=None,credentials_path:Optional[str]=None,embedding:Optional[Embeddings]=None,stream_update:bool=False,datastore_client_kwargs:Optional[Dict[str,Any]]=None,datastore_kind:str="document_id",datastore_text_property_name:str="text",datastore_metadata_property_name:str="metadata",**kwargs:Dict[str,Any],)->"VectorSearchVectorStoreDatastore":"""Takes the object creation out of the constructor. Args: project_id: The GCP project id. region: The default location making the API calls. It must have the same location as the GCS bucket and must be regional. index_id: The id of the created index. endpoint_id: The id of the created endpoint. index_staging_bucket_name: (Optional) If the index is updated by batch, bucket where the data will be staged before updating the index. Only required when updating the index. credentials_path: (Optional) The path of the Google credentials on the local file system. embedding: The :class:`Embeddings` that will be used for embedding the texts. stream_update: Whether to update with streaming or batching. VectorSearch index must be compatible with stream/batch updates. kwargs: Additional keyword arguments to pass to VertexAIVectorSearch.__init__(). Returns: A configured VectorSearchVectorStoreDatastore. """sdk_manager=VectorSearchSDKManager(project_id=project_id,region=region,credentials_path=credentials_path)sdk_manager=VectorSearchSDKManager(project_id=project_id,region=region,credentials_path=credentials_path)ifindex_staging_bucket_nameisnotNone:bucket=sdk_manager.get_gcs_bucket(bucket_name=index_staging_bucket_name)else:bucket=Noneindex=sdk_manager.get_index(index_id=index_id)endpoint=sdk_manager.get_endpoint(endpoint_id=endpoint_id)ifdatastore_client_kwargsisNone:datastore_client_kwargs={}datastore_client=sdk_manager.get_datastore_client(**datastore_client_kwargs)document_storage=DataStoreDocumentStorage(datastore_client=datastore_client,kind=datastore_kind,text_property_name=datastore_text_property_name,metadata_property_name=datastore_metadata_property_name,)returncls(document_storage=document_storage,searcher=VectorSearchSearcher(endpoint=endpoint,index=index,staging_bucket=bucket,stream_update=stream_update,),embbedings=embedding,)