[docs]@deprecated(since="0.3.3",removal="1.0",alternative_import="databricks_langchain.DatabricksVectorSearch",)classDatabricksVectorSearch(VectorStore):"""`Databricks Vector Search` vector store. To use, you should have the ``databricks-vectorsearch`` python package installed. Example: .. code-block:: python from langchain_community.vectorstores import DatabricksVectorSearch from databricks.vector_search.client import VectorSearchClient vs_client = VectorSearchClient() vs_index = vs_client.get_index( endpoint_name="vs_endpoint", index_name="ml.llm.index" ) vectorstore = DatabricksVectorSearch(vs_index) Args: index: A Databricks Vector Search index object. embedding: The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings. text_column: The name of the text column to use for the embeddings. Required for direct-access index or delta-sync index with self-managed embeddings. Make sure the text column specified is in the index. columns: The list of column names to get when doing the search. Defaults to ``[primary_key, text_column]``. Delta-sync index with Databricks-managed embeddings manages the ingestion, deletion, and embedding for you. Manually ingestion/deletion of the documents/texts is not supported for delta-sync index. If you want to use a delta-sync index with self-managed embeddings, you need to provide the embedding model and text column name to use for the embeddings. Example: .. code-block:: python from langchain_community.vectorstores import DatabricksVectorSearch from databricks.vector_search.client import VectorSearchClient from langchain_community.embeddings.openai import OpenAIEmbeddings vs_client = VectorSearchClient() vs_index = vs_client.get_index( endpoint_name="vs_endpoint", index_name="ml.llm.index" ) vectorstore = DatabricksVectorSearch( index=vs_index, embedding=OpenAIEmbeddings(), text_column="document_content" ) If you want to manage the documents ingestion/deletion yourself, you can use a direct-access index. Example: .. code-block:: python from langchain_community.vectorstores import DatabricksVectorSearch from databricks.vector_search.client import VectorSearchClient from langchain_community.embeddings.openai import OpenAIEmbeddings vs_client = VectorSearchClient() vs_index = vs_client.get_index( endpoint_name="vs_endpoint", index_name="ml.llm.index" ) vectorstore = DatabricksVectorSearch( index=vs_index, embedding=OpenAIEmbeddings(), text_column="document_content" ) vectorstore.add_texts( texts=["text1", "text2"] ) For more information on Databricks Vector Search, see `Databricks Vector Search documentation: https://docs.databricks.com/en/generative-ai/vector-search.html. """
[docs]def__init__(self,index:VectorSearchIndex,*,embedding:Optional[Embeddings]=None,text_column:Optional[str]=None,columns:Optional[List[str]]=None,):try:fromdatabricks.vector_search.clientimportVectorSearchIndexexceptImportErrorase:raiseImportError("Could not import databricks-vectorsearch python package. ""Please install it with `pip install databricks-vectorsearch`.")frome# indexself.index=indexifnotisinstance(index,VectorSearchIndex):raiseTypeError("index must be of type VectorSearchIndex.")# index_detailsindex_details=self.index.describe()self.primary_key=index_details["primary_key"]self.index_type=index_details.get("index_type")self._delta_sync_index_spec=index_details.get("delta_sync_index_spec",dict())self._direct_access_index_spec=index_details.get("direct_access_index_spec",dict())# text_columnifself._is_databricks_managed_embeddings():index_source_column=self._embedding_source_column_name()# check if input text column matches the source column of the indexiftext_columnisnotNoneandtext_column!=index_source_column:raiseValueError(f"text_column '{text_column}' does not match with the "f"source column of the index: '{index_source_column}'.")self.text_column=index_source_columnelse:self._require_arg(text_column,"text_column")self.text_column=text_column# columnsself.columns=columnsor[]# add primary key column and source column if not in columnsifself.primary_keynotinself.columns:self.columns.append(self.primary_key)ifself.text_columnandself.text_columnnotinself.columns:self.columns.append(self.text_column)# Validate specified columns are in the indexifself._is_direct_access_index():index_schema=self._index_schema()ifindex_schema:forcolinself.columns:ifcolnotinindex_schema:raiseValueError(f"column '{col}' is not in the index's schema.")# embedding modelifnotself._is_databricks_managed_embeddings():# embedding model is required for direct-access index# or delta-sync index with self-managed embeddingself._require_arg(embedding,"embedding")self._embedding=embedding# validate dimension matchesindex_embedding_dimension=self._embedding_vector_column_dimension()ifindex_embedding_dimensionisnotNone:inferred_embedding_dimension=self._infer_embedding_dimension()ifinferred_embedding_dimension!=index_embedding_dimension:raiseValueError(f"embedding model's dimension '{inferred_embedding_dimension}' "f"does not match with the index's dimension "f"'{index_embedding_dimension}'.")else:ifembeddingisnotNone:logger.warning("embedding model is not used in delta-sync index with ""Databricks-managed embeddings.")self._embedding=None
[docs]@classmethoddeffrom_texts(cls:Type[VST],texts:List[str],embedding:Embeddings,metadatas:Optional[List[Dict]]=None,**kwargs:Any,)->VST:raiseNotImplementedError("`from_texts` is not supported. ""Use `add_texts` to add to existing direct-access index.")
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[Dict]]=None,ids:Optional[List[Any]]=None,**kwargs:Any,)->List[str]:"""Add texts to the index. Only support direct-access index. Args: texts: List of texts to add. metadatas: List of metadata for each text. Defaults to None. ids: List of ids for each text. Defaults to None. If not provided, a random uuid will be generated for each text. Returns: List of ids from adding the texts into the index. """self._op_require_direct_access_index("add_texts")assertself.embeddingsisnotNone,"embedding model is required."# Wrap to list if input texts is a single stringifisinstance(texts,str):texts=[texts]texts=list(texts)vectors=self.embeddings.embed_documents(texts)ids=idsor[str(uuid.uuid4())for_intexts]metadatas=metadatasor[{}for_intexts]updates=[{self.primary_key:id_,self.text_column:text,self._embedding_vector_column_name():vector,**metadata,}fortext,vector,id_,metadatainzip(texts,vectors,ids,metadatas)]upsert_resp=self.index.upsert(updates)ifupsert_resp.get("status")in("PARTIAL_SUCCESS","FAILURE"):failed_ids=upsert_resp.get("result",dict()).get("failed_primary_keys",[])ifupsert_resp.get("status")=="FAILURE":logger.error("Failed to add texts to the index.")else:logger.warning("Some texts failed to be added to the index.")return[id_forid_inidsifid_notinfailed_ids]returnids
@propertydefembeddings(self)->Optional[Embeddings]:"""Access the query embedding object if available."""returnself._embedding
[docs]defdelete(self,ids:Optional[List[Any]]=None,**kwargs:Any)->Optional[bool]:"""Delete documents from the index. Only support direct-access index. Args: ids: List of ids of documents to delete. Returns: True if successful. """self._op_require_direct_access_index("delete")ifidsisNone:raiseValueError("ids must be provided.")self.index.delete(ids)returnTrue
[docs]defsimilarity_search(self,query:str,k:int=4,filter:Optional[Dict[str,Any]]=None,*,query_type:Optional[str]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filters to apply to the query. Defaults to None. query_type: The type of this query. Supported values are "ANN" and "HYBRID". Returns: List of Documents most similar to the embedding. """docs_with_score=self.similarity_search_with_score(query=query,k=k,filter=filter,query_type=query_type,**kwargs,)return[docfordoc,_indocs_with_score]
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,filter:Optional[Dict[str,Any]]=None,*,query_type:Optional[str]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs most similar to query, along with scores. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filters to apply to the query. Defaults to None. query_type: The type of this query. Supported values are "ANN" and "HYBRID". Returns: List of Documents most similar to the embedding and score for each. """ifself._is_databricks_managed_embeddings():query_text=queryquery_vector=Noneelse:assertself.embeddingsisnotNone,"embedding model is required."# The value for `query_text` needs to be specified only for hybrid search.ifquery_typeisnotNoneandquery_type.upper()=="HYBRID":query_text=queryelse:query_text=Nonequery_vector=self.embeddings.embed_query(query)search_resp=self.index.similarity_search(columns=self.columns,query_text=query_text,query_vector=query_vector,filters=filteror_alias_filters(kwargs),num_results=k,query_type=query_type,)returnself._parse_search_response(search_resp)
@staticmethoddef_identity_fn(score:float)->float:returnscoredef_select_relevance_score_fn(self)->Callable[[float],float]:""" Databricks Vector search uses a normalized score 1/(1+d) where d is the L2 distance. Hence, we simply return the identity function. """returnself._identity_fn
[docs]defmax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Dict[str,Any]]=None,*,query_type:Optional[str]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Filters to apply to the query. Defaults to None. query_type: The type of this query. Supported values are "ANN" and "HYBRID". Returns: List of Documents selected by maximal marginal relevance. """ifnotself._is_databricks_managed_embeddings():assertself.embeddingsisnotNone,"embedding model is required."query_vector=self.embeddings.embed_query(query)else:raiseValueError("`max_marginal_relevance_search` is not supported for index with ""Databricks-managed embeddings.")docs=self.max_marginal_relevance_search_by_vector(query_vector,k,fetch_k,lambda_mult=lambda_mult,filter=filteror_alias_filters(kwargs),query_type=query_type,)returndocs
[docs]defmax_marginal_relevance_search_by_vector(self,embedding:List[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Any]=None,*,query_type:Optional[str]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Filters to apply to the query. Defaults to None. query_type: The type of this query. Supported values are "ANN" and "HYBRID". Returns: List of Documents selected by maximal marginal relevance. """ifnotself._is_databricks_managed_embeddings():embedding_column=self._embedding_vector_column_name()else:raiseValueError("`max_marginal_relevance_search` is not supported for index with ""Databricks-managed embeddings.")search_resp=self.index.similarity_search(columns=list(set(self.columns+[embedding_column])),query_text=None,query_vector=embedding,filters=filteror_alias_filters(kwargs),num_results=fetch_k,query_type=query_type,)embeddings_result_index=(search_resp.get("manifest").get("columns").index({"name":embedding_column}))embeddings=[doc[embeddings_result_index]fordocinsearch_resp.get("result").get("data_array")]mmr_selected=maximal_marginal_relevance(np.array(embedding,dtype=np.float32),embeddings,k=k,lambda_mult=lambda_mult,)ignore_cols:List=([embedding_column]ifembedding_columnnotinself.columnselse[])candidates=self._parse_search_response(search_resp,ignore_cols=ignore_cols)selected_results=[r[0]fori,rinenumerate(candidates)ifiinmmr_selected]returnselected_results
[docs]defsimilarity_search_by_vector(self,embedding:List[float],k:int=4,filter:Optional[Any]=None,*,query_type:Optional[str]=None,query:Optional[str]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filters to apply to the query. Defaults to None. query_type: The type of this query. Supported values are "ANN" and "HYBRID". Returns: List of Documents most similar to the embedding. """docs_with_score=self.similarity_search_by_vector_with_score(embedding=embedding,k=k,filter=filter,query_type=query_type,query=query,**kwargs,)return[docfordoc,_indocs_with_score]
[docs]defsimilarity_search_by_vector_with_score(self,embedding:List[float],k:int=4,filter:Optional[Any]=None,*,query_type:Optional[str]=None,query:Optional[str]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs most similar to embedding vector, along with scores. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filters to apply to the query. Defaults to None. query_type: The type of this query. Supported values are "ANN" and "HYBRID". Returns: List of Documents most similar to the embedding and score for each. """ifself._is_databricks_managed_embeddings():raiseValueError("`similarity_search_by_vector` is not supported for index with ""Databricks-managed embeddings.")ifquery_typeisnotNoneandquery_type.upper()=="HYBRID":ifqueryisNone:raiseValueError("A value for `query` must be specified for hybrid search.")query_text=queryelse:ifqueryisnotNone:raiseValueError(("Cannot specify both `embedding` and "'`query` unless `query_type="HYBRID"'))query_text=Nonesearch_resp=self.index.similarity_search(columns=self.columns,query_vector=embedding,query_text=query_text,filters=filteror_alias_filters(kwargs),num_results=k,query_type=query_type,)returnself._parse_search_response(search_resp)
def_parse_search_response(self,search_resp:Dict,ignore_cols:Optional[List[str]]=None)->List[Tuple[Document,float]]:"""Parse the search response into a list of Documents with score."""ifignore_colsisNone:ignore_cols=[]columns=[col["name"]forcolinsearch_resp.get("manifest",dict()).get("columns",[])]docs_with_score=[]forresultinsearch_resp.get("result",dict()).get("data_array",[]):doc_id=result[columns.index(self.primary_key)]text_content=result[columns.index(self.text_column)]metadata={col:valueforcol,valueinzip(columns[:-1],result[:-1])ifcolnotin([self.primary_key,self.text_column]+ignore_cols)}metadata[self.primary_key]=doc_idscore=result[-1]doc=Document(page_content=text_content,metadata=metadata)docs_with_score.append((doc,score))returndocs_with_scoredef_index_schema(self)->Optional[Dict]:"""Return the index schema as a dictionary. Return None if no schema found. """ifself._is_direct_access_index():schema_json=self._direct_access_index_spec.get("schema_json")ifschema_jsonisnotNone:returnjson.loads(schema_json)returnNonedef_embedding_vector_column_name(self)->Optional[str]:"""Return the name of the embedding vector column. None if the index is not a self-managed embedding index. """returnself._embedding_vector_column().get("name")def_embedding_vector_column_dimension(self)->Optional[int]:"""Return the dimension of the embedding vector column. None if the index is not a self-managed embedding index. """returnself._embedding_vector_column().get("embedding_dimension")def_embedding_vector_column(self)->Dict:"""Return the embedding vector column configs as a dictionary. Empty if the index is not a self-managed embedding index. """index_spec=(self._delta_sync_index_specifself._is_delta_sync_index()elseself._direct_access_index_spec)returnnext(iter(index_spec.get("embedding_vector_columns")orlist()),dict())def_embedding_source_column_name(self)->Optional[str]:"""Return the name of the embedding source column. None if the index is not a Databricks-managed embedding index. """returnself._embedding_source_column().get("name")def_embedding_source_column(self)->Dict:"""Return the embedding source column configs as a dictionary. Empty if the index is not a Databricks-managed embedding index. """index_spec=self._delta_sync_index_specreturnnext(iter(index_spec.get("embedding_source_columns")orlist()),dict())def_is_delta_sync_index(self)->bool:"""Return True if the index is a delta-sync index."""returnself.index_type=="DELTA_SYNC"def_is_direct_access_index(self)->bool:"""Return True if the index is a direct-access index."""returnself.index_type=="DIRECT_ACCESS"def_is_databricks_managed_embeddings(self)->bool:"""Return True if the embeddings are managed by Databricks Vector Search."""return(self._is_delta_sync_index()andself._embedding_source_column_name()isnotNone)def_infer_embedding_dimension(self)->int:"""Infer the embedding dimension from the embedding function."""assertself.embeddingsisnotNone,"embedding model is required."returnlen(self.embeddings.embed_query("test"))def_op_require_direct_access_index(self,op_name:str)->None:""" Raise ValueError if the operation is not supported for direct-access index."""ifnotself._is_direct_access_index():raiseValueError(f"`{op_name}` is only supported for direct-access index.")@staticmethoddef_require_arg(arg:Any,arg_name:str)->None:"""Raise ValueError if the required arg with name `arg_name` is None."""ifnotarg:raiseValueError(f"`{arg_name}` is required for this index.")
def_alias_filters(kwargs:Dict[str,Any])->Optional[Dict[str,Any]]:""" The `filters` argument was used in the previous versions. It is now replaced with `filter` for consistency with other vector stores, but we still support `filters` for backward compatibility. """if"filters"inkwargs:warn_deprecated(since="0.2.11",removal="1.0",message="DatabricksVectorSearch received a key `filters` in search_kwargs. ""`filters` was deprecated since langchain-community 0.2.11 and will ""be removed in 0.3. Please use `filter` instead.",)returnkwargs.pop("filters",None)