[docs]@deprecated(since="0.0.12",removal="1.0",alternative_import="langchain_google_vertexai.VectorSearchVectorStore",)classMatchingEngine(VectorStore):"""`Google Vertex AI Vector Search` (previously Matching Engine) vector store. While the embeddings are stored in the Matching Engine, the embedded documents will be stored in GCS. An existing Index and corresponding Endpoint are preconditions for using this module. See usage in docs/integrations/vectorstores/google_vertex_ai_vector_search.ipynb Note that this implementation is mostly meant for reading if you are planning to do a real time implementation. While reading is a real time operation, updating the index takes close to one hour."""
[docs]def__init__(self,project_id:str,index:MatchingEngineIndex,endpoint:MatchingEngineIndexEndpoint,embedding:Embeddings,gcs_client:storage.Client,gcs_bucket_name:str,credentials:Optional[Credentials]=None,*,document_id_key:Optional[str]=None,):"""Google Vertex AI Vector Search (previously Matching Engine) implementation of the vector store. While the embeddings are stored in the Matching Engine, the embedded documents will be stored in GCS. An existing Index and corresponding Endpoint are preconditions for using this module. See usage in docs/integrations/vectorstores/google_vertex_ai_vector_search.ipynb. Note that this implementation is mostly meant for reading if you are planning to do a real time implementation. While reading is a real time operation, updating the index takes close to one hour. Attributes: project_id: The GCS project id. index: The created index class. See ~:func:`MatchingEngine.from_components`. endpoint: The created endpoint class. See ~:func:`MatchingEngine.from_components`. embedding: A :class:`Embeddings` that will be used for embedding the text sent. If none is sent, then the multilingual Tensorflow Universal Sentence Encoder will be used. gcs_client: The GCS client. gcs_bucket_name: The GCS bucket name. credentials (Optional): Created GCP credentials. document_id_key (Optional): Key for storing document ID in document metadata. If None, document ID will not be returned in document metadata. """super().__init__()self._validate_google_libraries_installation()self.project_id=project_idself.index=indexself.endpoint=endpointself.embedding=embeddingself.gcs_client=gcs_clientself.credentials=credentialsself.gcs_bucket_name=gcs_bucket_nameself.document_id_key=document_id_key
@propertydefembeddings(self)->Embeddings:returnself.embeddingdef_validate_google_libraries_installation(self)->None:"""Validates that Google libraries that are needed are installed."""try:fromgoogle.cloudimportaiplatform,storage# noqa: F401fromgoogle.oauth2importservice_account# noqa: F401exceptImportError:raiseImportError("You must run `pip install --upgrade ""google-cloud-aiplatform google-cloud-storage`""to use the MatchingEngine Vectorstore.")
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. kwargs: vectorstore specific parameters. Returns: List of ids from adding the texts into the vectorstore. """texts=list(texts)ifmetadatasisnotNoneandlen(texts)!=len(metadatas):raiseValueError("texts and metadatas do not have the same length. Received "f"{len(texts)} texts and {len(metadatas)} metadatas.")logger.debug("Embedding documents.")embeddings=self.embedding.embed_documents(texts)jsons=[]ids=[]# Could be improved with async.foridx,(embedding,text)inenumerate(zip(embeddings,texts)):id=str(uuid.uuid4())ids.append(id)json_:dict={"id":id,"embedding":embedding}ifmetadatasisnotNone:json_["metadata"]=metadatas[idx]jsons.append(json_)self._upload_to_gcs(text,f"documents/{id}")logger.debug(f"Uploaded {len(ids)} documents to GCS.")# Creating json lines from the embedded documents.result_str="\n".join([json.dumps(x)forxinjsons])filename_prefix=f"indexes/{uuid.uuid4()}"filename=f"{filename_prefix}/{time.time()}.json"self._upload_to_gcs(result_str,filename)logger.debug(f"Uploaded updated json with embeddings to "f"{self.gcs_bucket_name}/{filename}.")self.index=self.index.update_embeddings(contents_delta_uri=f"gs://{self.gcs_bucket_name}/{filename_prefix}/")logger.debug("Updated index with new configuration.")returnids
def_upload_to_gcs(self,data:str,gcs_location:str)->None:"""Uploads data to gcs_location. Args: data: The data that will be stored. gcs_location: The location where the data will be stored. """bucket=self.gcs_client.get_bucket(self.gcs_bucket_name)blob=bucket.blob(gcs_location)blob.upload_from_string(data)
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,filter:Optional[List[Namespace]]=None,)->List[Tuple[Document,float]]:"""Return docs most similar to query and their cosine distance from the query. Args: query: String query look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional. A list of Namespaces for filtering the matching results. For example: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] will match datapoints that satisfy "red color" but not include datapoints with "squared shape". Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. Returns: List[Tuple[Document, float]]: List of documents most similar to the query text and cosine distance in float for each. Lower score represents more similarity. """logger.debug(f"Embedding query {query}.")embedding_query=self.embedding.embed_query(query)returnself.similarity_search_by_vector_with_score(embedding_query,k=k,filter=filter)
[docs]defsimilarity_search_by_vector_with_score(self,embedding:List[float],k:int=4,filter:Optional[List[Namespace]]=None,)->List[Tuple[Document,float]]:"""Return docs most similar to the embedding and their cosine distance. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional. A list of Namespaces for filtering the matching results. For example: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] will match datapoints that satisfy "red color" but not include datapoints with "squared shape". Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. Returns: List[Tuple[Document, float]]: List of documents most similar to the query text and cosine distance in float for each. Lower score represents more similarity. """filter=filteror[]# If the endpoint is public we use the find_neighbors function.ifhasattr(self.endpoint,"_public_match_client")and(self.endpoint._public_match_client):response=self.endpoint.find_neighbors(deployed_index_id=self._get_index_id(),queries=[embedding],num_neighbors=k,filter=filter,)else:response=self.endpoint.match(deployed_index_id=self._get_index_id(),queries=[embedding],num_neighbors=k,filter=filter,)logger.debug(f"Found {len(response)} matches.")iflen(response)==0:return[]docs:List[Tuple[Document,float]]=[]# I'm only getting the first one because queries receives an array# and the similarity_search method only receives one query. This# means that the match method will always return an array with only# one element.forresultinresponse[0]:page_content=self._download_from_gcs(f"documents/{result.id}")# TODO: return all metadata.metadata={}ifself.document_id_keyisnotNone:metadata[self.document_id_key]=result.iddocument=Document(page_content=page_content,metadata=metadata,)docs.append((document,result.distance))# type: ignore[arg-type]logger.debug("Downloaded documents for query.")returndocs
[docs]defsimilarity_search(self,query:str,k:int=4,filter:Optional[List[Namespace]]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to query. Args: query: The string that will be used to search for similar documents. k: The amount of neighbors that will be retrieved. filter: Optional. A list of Namespaces for filtering the matching results. For example: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] will match datapoints that satisfy "red color" but not include datapoints with "squared shape". Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. Returns: A list of k matching documents. """docs_and_scores=self.similarity_search_with_score(query,k=k,filter=filter,**kwargs)return[docfordoc,_indocs_and_scores]
[docs]defsimilarity_search_by_vector(self,embedding:List[float],k:int=4,filter:Optional[List[Namespace]]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to the embedding. Args: embedding: Embedding to look up documents similar to. k: The amount of neighbors that will be retrieved. filter: Optional. A list of Namespaces for filtering the matching results. For example: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] will match datapoints that satisfy "red color" but not include datapoints with "squared shape". Please refer to https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json for more detail. Returns: A list of k matching documents. """docs_and_scores=self.similarity_search_by_vector_with_score(embedding,k=k,filter=filter,**kwargs)return[docfordoc,_indocs_and_scores]
def_get_index_id(self)->str:"""Gets the correct index id for the endpoint. Returns: The index id if found (which should be found) or throws ValueError otherwise. """forindexinself.endpoint.deployed_indexes:ifindex.index==self.index.resource_name:returnindex.idraiseValueError(f"No index with id {self.index.resource_name} "f"deployed on endpoint "f"{self.endpoint.display_name}.")def_download_from_gcs(self,gcs_location:str)->str:"""Downloads from GCS in text format. Args: gcs_location: The location where the file is located. Returns: The string contents of the file. """bucket=self.gcs_client.get_bucket(self.gcs_bucket_name)blob=bucket.blob(gcs_location)returnblob.download_as_string()
[docs]@classmethoddeffrom_texts(cls:Type["MatchingEngine"],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,**kwargs:Any,)->"MatchingEngine":"""Use from components instead."""raiseNotImplementedError("This method is not implemented. Instead, you should initialize the class"" with `MatchingEngine.from_components(...)` and then call ""`add_texts`")
[docs]@classmethoddeffrom_components(cls:Type["MatchingEngine"],project_id:str,region:str,gcs_bucket_name:str,index_id:str,endpoint_id:str,credentials_path:Optional[str]=None,embedding:Optional[Embeddings]=None,**kwargs:Any,)->"MatchingEngine":"""Takes the object creation out of the constructor. Args: project_id: The GCP project id. region: The default location making the API calls. It must have the same location as the GCS bucket and must be regional. gcs_bucket_name: The location where the vectors will be stored in order for the index to be created. index_id: The id of the created index. endpoint_id: The id of the created endpoint. credentials_path: (Optional) The path of the Google credentials on the local file system. embedding: The :class:`Embeddings` that will be used for embedding the texts. kwargs: Additional keyword arguments to pass to MatchingEngine.__init__(). Returns: A configured MatchingEngine with the texts added to the index. """gcs_bucket_name=cls._validate_gcs_bucket(gcs_bucket_name)credentials=cls._create_credentials_from_file(credentials_path)index=cls._create_index_by_id(index_id,project_id,region,credentials)# type: ignore[arg-type]endpoint=cls._create_endpoint_by_id(endpoint_id,project_id,region,credentials,# type: ignore[arg-type])gcs_client=cls._get_gcs_client(credentials,project_id)# type: ignore[arg-type]cls._init_aiplatform(project_id,region,gcs_bucket_name,credentials)# type: ignore[arg-type]returncls(project_id=project_id,index=index,endpoint=endpoint,embedding=embeddingorcls._get_default_embeddings(),gcs_client=gcs_client,credentials=credentials,gcs_bucket_name=gcs_bucket_name,**kwargs,)
@classmethoddef_validate_gcs_bucket(cls,gcs_bucket_name:str)->str:"""Validates the gcs_bucket_name as a bucket name. Args: gcs_bucket_name: The received bucket uri. Returns: A valid gcs_bucket_name or throws ValueError if full path is provided. """gcs_bucket_name=gcs_bucket_name.replace("gs://","")if"/"ingcs_bucket_name:raiseValueError(f"The argument gcs_bucket_name should only be "f"the bucket name. Received {gcs_bucket_name}")returngcs_bucket_name@classmethoddef_create_credentials_from_file(cls,json_credentials_path:Optional[str])->Optional[Credentials]:"""Creates credentials for GCP. Args: json_credentials_path: The path on the file system where the credentials are stored. Returns: An optional of Credentials or None, in which case the default will be used. """fromgoogle.oauth2importservice_accountcredentials=Noneifjson_credentials_pathisnotNone:credentials=service_account.Credentials.from_service_account_file(json_credentials_path)returncredentials@classmethoddef_create_index_by_id(cls,index_id:str,project_id:str,region:str,credentials:"Credentials")->MatchingEngineIndex:"""Creates a MatchingEngineIndex object by id. Args: index_id: The created index id. project_id: The project to retrieve index from. region: Location to retrieve index from. credentials: GCS credentials. Returns: A configured MatchingEngineIndex. """fromgoogle.cloudimportaiplatformlogger.debug(f"Creating matching engine index with id {index_id}.")returnaiplatform.MatchingEngineIndex(index_name=index_id,project=project_id,location=region,credentials=credentials,)@classmethoddef_create_endpoint_by_id(cls,endpoint_id:str,project_id:str,region:str,credentials:"Credentials")->MatchingEngineIndexEndpoint:"""Creates a MatchingEngineIndexEndpoint object by id. Args: endpoint_id: The created endpoint id. project_id: The project to retrieve index from. region: Location to retrieve index from. credentials: GCS credentials. Returns: A configured MatchingEngineIndexEndpoint. """fromgoogle.cloudimportaiplatformlogger.debug(f"Creating endpoint with id {endpoint_id}.")returnaiplatform.MatchingEngineIndexEndpoint(index_endpoint_name=endpoint_id,project=project_id,location=region,credentials=credentials,)@classmethoddef_get_gcs_client(cls,credentials:"Credentials",project_id:str)->"storage.Client":"""Lazily creates a GCS client. Returns: A configured GCS client. """fromgoogle.cloudimportstoragereturnstorage.Client(credentials=credentials,project=project_id,client_info=get_client_info(module="vertex-ai-matching-engine"),)@classmethoddef_init_aiplatform(cls,project_id:str,region:str,gcs_bucket_name:str,credentials:"Credentials",)->None:"""Configures the aiplatform library. Args: project_id: The GCP project id. region: The default location making the API calls. It must have the same location as the GCS bucket and must be regional. gcs_bucket_name: GCS staging location. credentials: The GCS Credentials object. """fromgoogle.cloudimportaiplatformlogger.debug(f"Initializing AI Platform for project {project_id} on "f"{region} and for {gcs_bucket_name}.")aiplatform.init(project=project_id,location=region,staging_bucket=gcs_bucket_name,credentials=credentials,)@classmethoddef_get_default_embeddings(cls)->"TensorflowHubEmbeddings":"""This function returns the default embedding. Returns: Default TensorflowHubEmbeddings to use. """fromlangchain_community.embeddingsimportTensorflowHubEmbeddingsreturnTensorflowHubEmbeddings()