Source code for langchain_google_community.bq_storage_vectorstores._base
from__future__importannotationsimportasyncioimportuuidfromabcimportABC,abstractmethodfromdatetimeimporttimedeltafromfunctoolsimportpartialfromimportlib.utilimportfind_specfromthreadingimportLockfromtypingimportAny,Dict,List,Optional,Tuple,Type,Union,castimportnumpyasnpfromlangchain_community.vectorstores.utilsimportmaximal_marginal_relevancefromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.pydantic_v1importBaseModel,ConfigDict,root_validatorfromlangchain_core.vectorstoresimportVectorStorefromlangchain_google_community._utilsimportget_client_infofromlangchain_google_community.bq_storage_vectorstores.utilsimport(check_bq_dataset_exists,validate_column_in_bq_schema,)_vector_table_lock=Lock()# process-wide BigQueryVectorSearch table lock# Constants for index creationMIN_INDEX_ROWS=5INDEX_CHECK_INTERVAL=timedelta(seconds=60)USER_AGENT_PREFIX="FeatureStore"classBaseBigQueryVectorStore(VectorStore,BaseModel,ABC):""" Abstract base class for BigQuery-based vector stores. This class provides a foundation for storing, retrieving, and searching documents and their corresponding embeddings in BigQuery. Attributes: embedding: Embedding model for generating and comparing embeddings. project_id: Google Cloud Project ID where BigQuery resources are located. dataset_name: BigQuery dataset name. table_name: BigQuery table name. location: BigQuery region/location. content_field: Name of the column storing document content (default: "content"). embedding_field: Name of the column storing text embeddings (default: "embedding"). doc_id_field: Name of the column storing document IDs (default: "doc_id"). credentials: Optional Google Cloud credentials object. embedding_dimension: Dimension of the embedding vectors (inferred if not provided). Abstract Methods: sync_data: Synchronizes data between the vector store and BigQuery. get_documents: Retrieves documents based on IDs or filters. _similarity_search_by_vectors_with_scores_and_embeddings: Performs similarity search with scores and embeddings. """model_config=ConfigDict(arbitrary_types_allowed=True)embedding:Embeddingsproject_id:strdataset_name:strtable_name:strlocation:strcontent_field:str="content"embedding_field:str="embedding"doc_id_field:str="doc_id"credentials:Optional[Any]=Noneembedding_dimension:Optional[int]=Noneextra_fields:Union[Dict[str,str],None]=Nonetable_schema:Any=None_bq_client:Any=None_logger:Any=None_full_table_id:Optional[str]=NoneclassConfig:arbitrary_types_allowed=True@abstractmethoddefsync_data(self)->None:...@abstractmethoddefget_documents(self,ids:Optional[List[str]],filter:Optional[Dict[str,Any]]=None,**kwargs:Any,)->List[Document]:"""Search documents by their ids or metadata values. Args: ids: List of ids of documents to retrieve from the vectorstore. filter: Filter on metadata properties, e.g. { "str_property": "foo", "int_property": 123 } Returns: List of ids from adding the texts into the vectorstore. """...@abstractmethoddef_similarity_search_by_vectors_with_scores_and_embeddings(self,embeddings:List[List[float]],filter:Optional[Dict[str,Any]]=None,k:int=5,batch_size:Union[int,None]=None,)->list[list[list[Any]]]:...@root_validator(pre=False,skip_on_failure=True)defvalidate_vals(cls,values:dict)->dict:try:importpandas# noqa: F401fromgoogle.cloudimportbigquery# type: ignore[attr-defined]fromgoogle.cloud.aiplatformimportbasefind_spec("pyarrow")find_spec("db_types")exceptModuleNotFoundError:raiseImportError("Please, install feature store dependency group: ""`pip install langchain-google-community[featurestore]`")values["_logger"]=base.Logger(__name__)values["_bq_client"]=bigquery.Client(project=values["project_id"],location=values["location"],credentials=values["credentials"],client_info=get_client_info(module="bigquery-vector-search"),)ifvalues["embedding_dimension"]isNone:values["embedding_dimension"]=len(values["embedding"].embed_query("test"))full_table_id=(f"{values['project_id']}.{values['dataset_name']}.{values['table_name']}")values["_full_table_id"]=full_table_idtemp_dataset_id=f"{values['dataset_name']}_temp"ifnotcheck_bq_dataset_exists(client=values["_bq_client"],dataset_id=values["dataset_name"]):values["_bq_client"].create_dataset(dataset=values["dataset_name"],exists_ok=True)ifnotcheck_bq_dataset_exists(client=values["_bq_client"],dataset_id=temp_dataset_id):values["_bq_client"].create_dataset(dataset=temp_dataset_id,exists_ok=True)table_ref=bigquery.TableReference.from_string(full_table_id)values["_bq_client"].create_table(table_ref,exists_ok=True)values["_logger"].info(f"BigQuery table {full_table_id} "f"initialized/validated as persistent storage. "f"Access via BigQuery console:\n "f"https://console.cloud.google.com/bigquery?project={values['project_id']}"f"&ws=!1m5!1m4!4m3!1s{values['project_id']}!2s{values['dataset_name']}!3s"f"{values['table_name']}")returnvalues@propertydefembeddings(self)->Optional[Embeddings]:returnself.embedding@propertydeffull_table_id(self)->str:returncast(str,self._full_table_id)def_validate_bq_table(self)->Any:fromgoogle.cloudimportbigquery# type: ignore[attr-defined]fromgoogle.cloud.exceptionsimportNotFoundtable_ref=bigquery.TableReference.from_string(self.full_table_id)try:# Attempt to retrieve the table informationself._bq_client.get_table(self.full_table_id)exceptNotFound:self._logger.debug(f"Couldn't find table {self.full_table_id}. "f"Table will be created once documents are added")returntable=self._bq_client.get_table(table_ref)schema=table.schema.copy()ifschema:## Check if table has a schemaself.table_schema={field.name:field.field_typeforfieldinschema}columns={c.name:cforcinschema}validate_column_in_bq_schema(column_name=self.doc_id_field,columns=columns,expected_types=["STRING"],expected_modes=["NULLABLE","REQUIRED"],)validate_column_in_bq_schema(column_name=self.content_field,columns=columns,expected_types=["STRING"],expected_modes=["NULLABLE","REQUIRED"],)validate_column_in_bq_schema(column_name=self.embedding_field,columns=columns,expected_types=["FLOAT","FLOAT64"],expected_modes=["REPEATED"],)ifself.extra_fieldsisNone:extra_fields={}forcolumninschema:ifcolumn.namenotin[self.doc_id_field,self.content_field,self.embedding_field,]:# Check for unsupported REPEATED modeifcolumn.mode=="REPEATED":raiseValueError(f"Column '{column.name}' is REPEATED. "f"REPEATED fields are not supported in this context.")extra_fields[column.name]=column.field_typeself.extra_fields=extra_fieldselse:forfield,typeinself.extra_fields.items():validate_column_in_bq_schema(column_name=field,columns=columns,expected_types=[type],expected_modes=["NULLABLE","REQUIRED"],)self._logger.debug(f"Table {self.full_table_id} validated")returntable_refdef_initialize_bq_table(self)->Any:"""Validates or creates the BigQuery table."""fromgoogle.cloudimportbigquery# type: ignore[attr-defined]self._bq_client.create_dataset(dataset=self.dataset_name,exists_ok=True)self._bq_client.create_dataset(dataset=f"{self.dataset_name}_temp",exists_ok=True)table_ref=bigquery.TableReference.from_string(self.full_table_id)self._bq_client.create_table(table_ref,exists_ok=True)returntable_refdefadd_texts(# type: ignore[override]self,texts:List[str],metadatas:Optional[List[dict]]=None,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: List of strings to add to the vectorstore. metadatas: Optional list of metadata records associated with the texts. (ie [{"url": "www.myurl1.com", "title": "title1"}, {"url": "www.myurl2.com", "title": "title2"}]) Returns: List of ids from adding the texts into the vectorstore. """embs=self.embedding.embed_documents(texts)returnself.add_texts_with_embeddings(texts=texts,embs=embs,metadatas=metadatas,**kwargs)defadd_texts_with_embeddings(self,texts:List[str],embs:List[List[float]],metadatas:Optional[List[dict]]=None,)->List[str]:"""Add precomputed embeddings and relative texts / metadatas to the vectorstore. Args: ids: List of unique ids in string format texts: List of strings to add to the vectorstore. embs: List of lists of floats with text embeddings for texts. metadatas: Optional list of metadata records associated with the texts. (ie [{"url": "www.myurl1.com", "title": "title1"}, {"url": "www.myurl2.com", "title": "title2"}]) Returns: List of ids from adding the texts into the vectorstore. """importpandasaspdids=[uuid.uuid4().hexfor_intexts]ifmetadatasisNone:metadatas=[{}for_intexts]values_dict:List[Dict[str,List[Any]]]=[]foridx,text,emb,metadata_dictinzip(ids,texts,embs,metadatas):record={self.doc_id_field:idx,self.content_field:text,self.embedding_field:emb,}record.update(metadata_dict)values_dict.append(record)# type: ignore[arg-type]table=self._bq_client.get_table(self.full_table_id)# Attempt to retrieve the table informationdf=pd.DataFrame(values_dict)job=self._bq_client.load_table_from_dataframe(df,table)job.result()self._validate_bq_table()self._logger.debug(f"stored {len(ids)} records in BQ")self.sync_data()returnidsdefdelete(self,ids:Optional[List[str]]=None,**kwargs:Any)->Optional[bool]:"""Delete documents by record IDs Args: ids: List of ids to delete. **kwargs: Other keyword arguments that subclasses might use. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """fromgoogle.cloudimportbigquery# type: ignore[attr-defined]ifnotidsorlen(ids)==0:returnTruejob_config=bigquery.QueryJobConfig(query_parameters=[bigquery.ArrayQueryParameter("ids","STRING",ids)],)self._bq_client.query(f""" DELETE FROM `{self.full_table_id}` WHERE {self.doc_id_field} IN UNNEST(@ids) """,job_config=job_config,).result()self.sync_data()returnTrueasyncdefadelete(self,ids:Optional[List[str]]=None,**kwargs:Any)->Optional[bool]:"""Delete by vector ID or other criteria. Args: ids: List of ids to delete. **kwargs: Other keyword arguments that subclasses might use. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """returnawaitasyncio.get_running_loop().run_in_executor(None,partial(self.delete,**kwargs),ids)defsimilarity_search_by_vectors(self,embeddings:List[List[float]],filter:Optional[Dict[str,Any]]=None,k:int=5,with_scores:bool=False,with_embeddings:bool=False,**kwargs:Any,)->Any:"""Core similarity search function. Handles a list of embedding vectors, optionally returning scores and embeddings. Args: embeddings: A list of embedding vectors, where each vector is a list of floats. filter: (Optional) A dictionary specifying filtering criteria for the documents. Ie. {"title": "mytitle"} k: (Optional) The number of top-ranking similar documents to return per embedding. Defaults to 5. with_scores: (Optional) If True, include similarity scores in the result for each matched document. Defaults to False. with_embeddings: (Optional) If True, include the matched document's embedding vector in the result. Defaults to False. Returns: A list of `k` documents for each embedding in `embeddings` """results=self._similarity_search_by_vectors_with_scores_and_embeddings(embeddings=embeddings,k=k,filter=filter,**kwargs)# Process results based on optionsfori,query_resultsinenumerate(results):ifnotwith_scoresandnotwith_embeddings:# return only docsresults[i]=[x[0]forxinquery_results]elifnotwith_embeddings:# return only docs and scoreresults[i]=[[x[0],x[1]]forxinquery_results]elifnotwith_scores:# return only docs and embeddingsresults[i]=[[x[0],x[2]]forxinquery_results]returnresults# type: ignore[return-value]defsimilarity_search_by_vector(self,embedding:List[float],k:int=5,**kwargs:Any,)->List[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. filter: (Optional) A dictionary specifying filtering criteria for the documents. Ie. {"title": "mytitle"} k: (Optional) The number of top-ranking similar documents to return per embedding. Defaults to 5. Returns: Return docs most similar to embedding vector. """returnself.similarity_search_by_vectors(embeddings=[embedding],k=k,**kwargs)[0]defsimilarity_search_by_vector_with_score(self,embedding:List[float],filter:Optional[Dict[str,Any]]=None,k:int=5,)->List[Tuple[Document,float]]:"""Return docs most similar to embedding vector with scores. Args: embedding: Embedding to look up documents similar to. filter: (Optional) A dictionary specifying filtering criteria for the documents. Ie. {"title": "mytitle"} k: (Optional) The number of top-ranking similar documents to return per embedding. Defaults to 5. Returns: Return docs most similar to embedding vector. """returnself.similarity_search_by_vectors(embeddings=[embedding],filter=filter,k=k,with_scores=True)[0]defsimilarity_search(self,query:str,k:int=5,**kwargs:Any)->List[Document]:"""Search for top `k` docs most similar to input query. Args: query: search query to search documents with. filter: (Optional) A dictionary specifying filtering criteria for the documents. Ie. {"title": "mytitle"} k: (Optional) The number of top-ranking similar documents to return per embedding. Defaults to 5. Returns: Return docs most similar to input query. """embedding=self.embedding.embed_query(query)returnself.similarity_search_by_vectors(embeddings=[embedding],k=k,**kwargs)[0]defsimilarity_search_with_score(self,query:str,filter:Optional[Dict[str,Any]]=None,k:int=5,**kwargs:Any,)->List[Tuple[Document,float]]:"""Search for top `k` docs most similar to input query, returns both docs and scores. Args: query: search query to search documents with. filter: (Optional) A dictionary specifying filtering criteria for the documents. Ie. {"title": "mytitle"} k: (Optional) The number of top-ranking similar documents to return per embedding. Defaults to 5. Returns: Return docs most similar to input query along with scores. """embedding=self.embedding.embed_query(query)returnself.similarity_search_by_vector_with_score(embedding=embedding,filter=filter,k=k)@classmethoddeffrom_texts(cls:Type["BaseBigQueryVectorStore"],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,**kwargs:Any,)->"BaseBigQueryVectorStore":raiseNotImplementedError()defmax_marginal_relevance_search(self,query:str,k:int=5,fetch_k:int=25,lambda_mult:float=0.5,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: **kwargs: query: search query text. filter: Filter on metadata properties, e.g. { "str_property": "foo", "int_property": 123 } k: Number of Documents to return. Defaults to 5. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents selected by maximal marginal relevance. """embedding=self.embedding.embed_query(query)returnself.max_marginal_relevance_search_by_vector(embedding=embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,**kwargs)defmax_marginal_relevance_search_by_vector(self,embedding:List[float],k:int=5,fetch_k:int=25,lambda_mult:float=0.5,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. filter: Filter on metadata properties, e.g. { "str_property": "foo", "int_property": 123 } k: Number of Documents to return. Defaults to 5. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents selected by maximal marginal relevance. """doc_tuples=self.similarity_search_by_vectors(embeddings=[embedding],k=fetch_k,with_embeddings=True,with_scores=True,**kwargs,)[0]doc_embeddings=[d[2]fordindoc_tuples]# type: ignore[index]mmr_doc_indexes=maximal_marginal_relevance(np.array(embedding),doc_embeddings,lambda_mult=lambda_mult,k=k)return[doc_tuples[i][0]foriinmmr_doc_indexes]# type: ignore[index]