Source code for langchain_google_community.bq_storage_vectorstores.featurestore
from__future__importannotationsimporttimefromdatetimeimporttimedeltafromsubprocessimportTimeoutExpiredfromtypingimportAny,Dict,List,MutableSequence,Optional,Type,Unionimportproto# type: ignore[import-untyped]fromgoogle.api_core.exceptionsimport(MethodNotImplemented,NotFound,ServiceUnavailable,)fromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.pydantic_v1importroot_validatorfromlangchain_google_community._utilsimportget_client_info,get_user_agentfromlangchain_google_community.bq_storage_vectorstores._baseimport(BaseBigQueryVectorStore,)fromlangchain_google_community.bq_storage_vectorstores.utilsimport(cast_proto_type,doc_match_filter,)# Constants for index creationMIN_INDEX_ROWS=5INDEX_CHECK_INTERVAL=timedelta(seconds=60)USER_AGENT_PREFIX="FeatureStore"
[docs]classVertexFSVectorStore(BaseBigQueryVectorStore):""" A vector store implementation that utilizes BigQuery Storage and Vertex AI Feature Store. This class provides efficient storage, using BigQuery as the underlining source of truth and retrieval of documents with vector embeddings within Vertex AI Feature Store. It is particularly indicated for low latency serving. It supports similarity search, filtering and getting nearest neighbor by id. Optionally, this class can leverage a BigQuery Vector Search for batch serving through the `to_bq_vector_store` method. Attributes: embedding: Embedding model for generating and comparing embeddings. project_id: Google Cloud Project ID where BigQuery resources are located. dataset_name: BigQuery dataset name. table_name: BigQuery table name. location: BigQuery region/location. content_field: Name of the column storing document content (default: "content"). embedding_field: Name of the column storing text embeddings (default: "embedding"). doc_id_field: Name of the column storing document IDs (default: "doc_id"). credentials: Optional Google Cloud credentials object. embedding_dimension: Dimension of the embedding vectors (inferred if not provided). online_store_name (str, optional): Name of the Vertex AI Feature Store online store. Defaults to the dataset name. online_store_location (str, optional): Location of the online store. Default to "location" parameter. view_name (str, optional): Name of the Feature View. Defaults to the table name. cron_schedule (str, optional): Cron schedule for data syncing. algorithm_config (Any, optional): Algorithm configuration for indexing. filter_columns (List[str], optional): Columns to use for filtering. crowding_column (str, optional): Column to use for crowding. distance_measure_type (str, optional): Distance measure type (default: DOT_PRODUCT_DISTANCE). """online_store_name:Union[str,None]=Noneonline_store_location:Union[str,None]=Noneview_name:Union[str,None]=Nonecron_schedule:Union[str,None]=Nonealgorithm_config:Optional[Any]=Nonefilter_columns:Optional[List[str]]=Nonecrowding_column:Optional[str]=Nonedistance_measure_type:Optional[str]=None_user_agent:str=""feature_view:Any=None_admin_client:Any=None@root_validator(pre=False,skip_on_failure=True)def_initialize_bq_vector_index(cls,values:dict)->dict:importvertexaifromgoogle.cloud.aiplatform_v1beta1import(FeatureOnlineStoreAdminServiceClient,FeatureOnlineStoreServiceClient,)fromvertexai.resources.preview.feature_storeimport(utils,# type: ignore[import-untyped])vertexai.init(project=values["project_id"],location=values["location"])values["_user_agent"]=get_user_agent(f"{USER_AGENT_PREFIX}-VertexFSVectorStore")[1]ifvalues["algorithm_config"]isNone:values["algorithm_config"]=utils.TreeAhConfig()ifvalues["distance_measure_type"]isNone:values["distance_measure_type"]=utils.DistanceMeasureType.DOT_PRODUCT_DISTANCEifvalues.get("online_store_name")isNone:values["online_store_name"]=values["dataset_name"]ifvalues.get("view_name")isNone:values["view_name"]=values["table_name"]api_endpoint=f"{values['location']}-aiplatform.googleapis.com"values["_admin_client"]=FeatureOnlineStoreAdminServiceClient(client_options={"api_endpoint":api_endpoint},client_info=get_client_info(module=values["_user_agent"]),)values["online_store"]=_create_online_store(project_id=values["project_id"],location=values["location"],online_store_name=values["online_store_name"],_admin_client=values["_admin_client"],_logger=values["_logger"],)gca_resource=values["online_store"].gca_resourceendpoint=gca_resource.dedicated_serving_endpoint.public_endpoint_domain_namevalues["_search_client"]=FeatureOnlineStoreServiceClient(client_options={"api_endpoint":endpoint},client_info=get_client_info(module=values["_user_agent"]),)values["feature_view"]=_get_feature_view(values["online_store"],values["view_name"])values["_logger"].info("VertexFSVectorStore initialized with Feature Store Vector Search. \n""Optional batch serving available via .to_bq_vector_store() method.")returnvaluesdef_init_store(self)->None:fromgoogle.cloud.aiplatform_v1beta1importFeatureOnlineStoreServiceClientself.online_store=self._create_online_store()gca_resource=self.online_store.gca_resourceendpoint=gca_resource.dedicated_serving_endpoint.public_endpoint_domain_nameself._search_client=FeatureOnlineStoreServiceClient(client_options={"api_endpoint":endpoint},client_info=get_client_info(module=self._user_agent),)self.feature_view=self._get_feature_view()def_validate_bq_existing_source(self,)->None:bq_uri=self.feature_view.gca_resource.big_query_source.uri# type: ignore[union-attr]bq_uri_split=bq_uri.split(".")project_id=bq_uri_split[0].replace("bq://","")dataset=bq_uri_split[1]table=bq_uri_split[2]try:assertself.project_id==project_idassertself.dataset_name==datasetassertself.table_name==tableexceptAssertionError:error_message=("The BQ table passed in input is"f"bq://{self.project_id}.{self.dataset_name}.{self.table_name} "f"while the BQ table linked to the feature view is "f"{bq_uri}.""Make sure you are using the same table for the feature ""view.")raiseAssertionError(error_message)def_wait_until_dummy_query_success(self,timeout_seconds:int=6000)->None:""" Waits until a dummy query succeeds, indicating the system is ready. """start_time=time.time()whileTrue:elapsed_time=time.time()-start_timeifelapsed_time>timeout_seconds:raiseTimeoutExpired("Timeout of {} seconds exceeded".format(timeout_seconds),timeout=timeout_seconds,)try:_=self._search_embedding(embedding=[1]*self.embedding_dimension,# type: ignore[operator]k=1,)returnNoneexcept(ServiceUnavailable,MethodNotImplemented):self._logger.info("DNS certificates are being propagated, please wait")time.sleep(30)self._init_store()
[docs]defsync_data(self)->None:"""Sync the data from the BigQuery source into the Executor source"""self.feature_view=self._create_feature_view()self._validate_bq_existing_source()sync_response=self._admin_client.sync_feature_view(feature_view=(f"projects/{self.project_id}/"f"locations/{self.location}"f"/featureOnlineStores/{self.online_store_name}"f"/featureViews/{self.view_name}"))whileTrue:feature_view_sync=self._admin_client.get_feature_view_sync(name=sync_response.feature_view_sync)iffeature_view_sync.run_time.end_time.seconds>0:status=("Succeed"iffeature_view_sync.final_status.code==0else"Failed")self._logger.info(f"Sync {status} for {feature_view_sync.name}.")breakelse:self._logger.info("Sync ongoing, waiting for 30 seconds.")time.sleep(30)self._wait_until_dummy_query_success()
def_similarity_search_by_vectors_with_scores_and_embeddings(self,embeddings:List[List[float]],filter:Optional[Dict[str,Any]]=None,k:int=5,batch_size:Union[int,None]=None,**kwargs:Any,)->List[List[List[Any]]]:"""Performs a similarity search using vector embeddings This function takes a set of query embeddings and searches for similar documents It returns the top-k matching documents, along with their similarity scores and their corresponding embeddings. Args: embeddings: A list of lists, where each inner list represents a query embedding. filter: (Optional) A dictionary specifying filter criteria for document on metadata properties, e.g. { "str_property": "foo", "int_property": 123 } k: The number of top results to return for each query. batch_size: The size of batches to process embeddings. Returns: A list of lists of lists. Each inner list represents the results for a single query, and contains elements of the form [Document, score, embedding], where: - Document: The matching document object. - score: The similarity score between the query and document. - embedding: The document's embedding. """output=[]forquery_embeddinginembeddings:results=self._search_embedding(embedding=query_embedding,k=k,**kwargs)output.append(self._parse_proto_output(results,filter=filter))returnoutput
[docs]defget_documents(self,ids:Optional[List[str]],filter:Optional[Dict[str,Any]]=None,**kwargs:Any,)->List[Document]:"""Search documents by their ids or metadata values. Args: ids: List of ids of documents to retrieve from the vectorstore. filter: Filter on metadata properties, e.g. { "str_property": "foo", "int_property": 123 } Returns: List of ids from adding the texts into the vectorstore. """fromgoogle.cloudimportaiplatformoutput=[]ifidsisNone:raiseValueError("Feature Store executor doesn't support search by filter ""only")foridinids:withaiplatform.telemetry.tool_context_manager(self._user_agent):result=self.feature_view.read(key=[id])# type: ignore[union-attr]metadata,content={},Noneforfeatureinresult.to_dict()["features"]:iffeature["name"]notin[self.embedding_field,self.content_field,]:metadata[feature["name"]]=list(feature["value"].values())[0]iffeature["name"]==self.content_field:content=list(feature["value"].values())[0]iffilterisnotNoneandnotdoc_match_filter(document=metadata,filter=filter):continueoutput.append(Document(page_content=str(content),metadata=metadata,))returnoutput
[docs]defsearch_neighbors_by_ids(self,ids:List[str],filter:Optional[Dict[str,Any]]=None,**kwargs:Any,)->List[List[List[Any]]]:"""Searches for neighboring entities in a Vertex Feature Store based on their IDs and optional filter on metadata Args: ids: A list of string identifiers representing the entities to search for. filter: (Optional) A dictionary specifying filter criteria for document on metadata properties, e.g. { "str_property": "foo", "int_property": 123 } """output=[]ifidsisNone:raiseValueError("Feature Store executor doesn't support search by filter ""only")forentity_idinids:try:results=self._search_embedding(entity_id=entity_id,**kwargs)output.append(self._parse_proto_output(results,filter=filter))exceptNotFound:output.append([])returnoutput
def_parse_proto_output(self,search_results:MutableSequence[Any],filter:Optional[Dict[str,Any]]=None,)->List[List[Any]]:documents=[]forresultinsearch_results:metadata,embedding={},Noneforfeatureinresult.entity_key_values.key_values.features:iffeature.namenotin[self.embedding_field,self.content_field,]:dict_values=proto.Message.to_dict(feature.value)ifdict_values:col_type,value=next(iter(dict_values.items()))value=cast_proto_type(column=col_type,value=value)metadata[feature.name]=valueelse:metadata[feature.name]=Noneiffeature.name==self.embedding_field:embedding=feature.value.double_array_value.valuesiffeature.name==self.content_field:dict_values=proto.Message.to_dict(feature.value)content=list(dict_values.values())[0]iffilterisnotNoneandnotdoc_match_filter(document=metadata,filter=filter):continuedocuments.append([Document(page_content=content,metadata=metadata,),result.distance,embedding,])returndocumentsdef_search_embedding(self,embedding:Any=None,entity_id:Optional[str]=None,k:int=5,string_filters:Optional[List[dict]]=None,per_crowding_attribute_neighbor_count:Optional[int]=None,approximate_neighbor_candidates:Optional[int]=None,leaf_nodes_search_fraction:Optional[float]=None,)->MutableSequence[Any]:fromgoogle.cloudimportaiplatformfromgoogle.cloud.aiplatform_v1beta1.typesimport(NearestNeighborQuery,feature_online_store_service,)ifembedding:embedding=NearestNeighborQuery.Embedding(value=embedding)query=NearestNeighborQuery(entity_id=entity_id,embedding=embedding,neighbor_count=k,string_filters=string_filters,per_crowding_attribute_neighbor_count=per_crowding_attribute_neighbor_count,parameters={"approximate_neighbor_candidates":approximate_neighbor_candidates,"leaf_nodes_search_fraction":leaf_nodes_search_fraction,},)withaiplatform.telemetry.tool_context_manager(self._user_agent):result=self._search_client.search_nearest_entities(request=feature_online_store_service.SearchNearestEntitiesRequest(feature_view=self.feature_view.gca_resource.name,# type: ignore[union-attr]query=query,return_full_entity=True,# returning entities with metadata))returnresult.nearest_neighbors.neighborsdef_create_online_store(self)->Any:# Search for existing Online storeifself.online_store_name:return_create_online_store(project_id=self.project_id,location=self.location,online_store_name=self.online_store_name,_admin_client=self._admin_client,_logger=self._logger,)def_create_feature_view(self)->Any:importvertexaifromvertexai.resources.preview.feature_storeimport(utils,# type: ignore[import-untyped])fv=self._get_feature_view()iffv:returnfvelse:FeatureViewBigQuerySource=(vertexai.resources.preview.FeatureViewBigQuerySource)big_query_source=FeatureViewBigQuerySource(uri=f"bq://{self.full_table_id}",entity_id_columns=[self.doc_id_field],)index_config=utils.IndexConfig(embedding_column=self.embedding_field,crowding_column=self.crowding_column,filter_columns=self.filter_columns,dimensions=self.embedding_dimension,distance_measure_type=self.distance_measure_type,algorithm_config=self.algorithm_config,)returnself.online_store.create_feature_view(name=self.view_name,source=big_query_source,sync_config=self.cron_schedule,index_config=index_config,project=self.project_id,location=self.location,)def_get_feature_view(self)->Any|None:# Search for existing Feature viewreturn_get_feature_view(self.online_store,self.view_name)
[docs]@classmethoddeffrom_texts(cls:Type["VertexFSVectorStore"],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,**kwargs:Any,)->"VertexFSVectorStore":"""Return VectorStore initialized from input texts Args: texts: List of strings to add to the vectorstore. embedding: An embedding model instance for text to vector transformations. metadatas: Optional list of metadata records associated with the texts. (ie [{"url": "www.myurl1.com", "title": "title1"}, {"url": "www.myurl2.com", "title": "title2"}]) Returns: List of ids from adding the texts into the vectorstore. """vs_obj=VertexFSVectorStore(embedding=embedding,**kwargs)vs_obj.add_texts(texts,metadatas)returnvs_obj
[docs]defto_bq_vector_store(self,**kwargs:Any)->Any:""" Converts the current object's parameters into a `BigQueryVectorStore` instance. This method combines the base parameters of the current object to create a `BigQueryVectorStore` object. Args: **kwargs: Additional keyword arguments to be passed to the ` BigQueryVectorStore` constructor. These override any matching parameters in the base object. Returns: BigQueryVectorStore: An initialized `BigQueryVectorStore` object ready for vector search operations. Raises: ValueError: If any of the combined parameters are invalid for initializing a `BigQueryVectorStore`. """fromlangchain_google_community.bq_storage_vectorstores.bigqueryimport(BigQueryVectorStore,)base_params=self.dict(include=BaseBigQueryVectorStore.__fields__.keys())base_params["embedding"]=self.embeddingall_params={**base_params,**kwargs}bq_obj=BigQueryVectorStore(**all_params)returnbq_obj
def_create_online_store(project_id:str,location:str,online_store_name:str,_logger:Any,_admin_client:Any,)->Any:# Search for existing Online storeimportvertexaifromgoogle.cloud.aiplatform_v1beta1.typesimport(feature_online_storeasfeature_online_store_pb2,)stores_list=vertexai.resources.preview.FeatureOnlineStore.list(project=project_id,location=location)forstoreinstores_list:ifstore.name==online_store_name:returnstore_logger.info("Creating feature store online store")# Create it otherwiseonline_store_config=feature_online_store_pb2.FeatureOnlineStore(optimized=feature_online_store_pb2.FeatureOnlineStore.Optimized())create_store_lro=_admin_client.create_feature_online_store(parent=f"projects/{project_id}/locations/{location}",feature_online_store_id=online_store_name,feature_online_store=online_store_config,)_logger.info(create_store_lro.result())_logger.info(create_store_lro.result())stores_list=vertexai.resources.preview.FeatureOnlineStore.list(project=project_id,location=location)forstoreinstores_list:ifstore.name==online_store_name:returnstoredef_get_feature_view(online_store:Any,view_name:Optional[str])->Any:# Search for existing Feature viewimportvertexaifv_list=vertexai.resources.preview.FeatureView.list(feature_online_store_id=online_store.gca_resource.name)forfvinfv_list:iffv.name==view_name:returnfvreturnNone