Source code for langchain_community.vectorstores.azure_cosmos_db_no_sql

from __future__ import annotations

import uuid
import warnings
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from pydantic import BaseModel, Field

from langchain_community.vectorstores.utils import maximal_marginal_relevance

if TYPE_CHECKING:
    from azure.cosmos import ContainerProxy, CosmosClient
    from azure.identity import DefaultAzureCredential

USER_AGENT = ("LangChain-CDBNoSql-VectorStore-Python",)


[docs] class Condition(BaseModel): property: str operator: str value: Any
[docs] class PreFilter(BaseModel): conditions: List[Condition] = Field(default_factory=list) logical_operator: Optional[str] = None
[docs] class CosmosDBQueryType(str, Enum): """CosmosDB Query Type""" VECTOR = "vector" FULL_TEXT_SEARCH = "full_text_search" FULL_TEXT_RANK = "full_text_rank" HYBRID = "hybrid"
[docs] class AzureCosmosDBNoSqlVectorSearch(VectorStore): """`Azure Cosmos DB for NoSQL` vector store. To use, you should have both: - the ``azure-cosmos`` python package installed You can read more about vector search, full text search and hybrid search using AzureCosmosDBNoSQL here: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/vector-search https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/full-text-search https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/hybrid-search """
[docs] def __init__( self, *, cosmos_client: CosmosClient, embedding: Embeddings, vector_embedding_policy: Dict[str, Any], indexing_policy: Dict[str, Any], cosmos_container_properties: Dict[str, Any], cosmos_database_properties: Dict[str, Any], full_text_policy: Optional[Dict[str, Any]] = None, database_name: str = "vectorSearchDB", container_name: str = "vectorSearchContainer", text_key: str = "text", embedding_key: str = "embedding", metadata_key: str = "metadata", create_container: bool = True, full_text_search_enabled: bool = False, ): """ Constructor for AzureCosmosDBNoSqlVectorSearch Args: cosmos_client: Client used to connect to azure cosmosdb no sql account. database_name: Name of the database to be created. container_name: Name of the container to be created. embedding: Text embedding model to use. vector_embedding_policy: Vector Embedding Policy for the container. full_text_policy: Full Text Policy for the container. indexing_policy: Indexing Policy for the container. cosmos_container_properties: Container Properties for the container. cosmos_database_properties: Database Properties for the container. text_key: Text key to use for text property which will be embedded in the data schema. embedding_key: Embedding key to use for vector embedding. metadata_key: Metadata key to use for data schema. create_container: Set to true if the container does not exist. full_text_search_enabled: Set to true if the full text search is enabled. """ self._cosmos_client = cosmos_client self._database_name = database_name self._container_name = container_name self._embedding = embedding self._vector_embedding_policy = vector_embedding_policy self._full_text_policy = full_text_policy self._indexing_policy = indexing_policy self._cosmos_container_properties = cosmos_container_properties self._cosmos_database_properties = cosmos_database_properties self._text_key = text_key self._embedding_key = embedding_key self._metadata_key = metadata_key self._create_container = create_container self._full_text_search_enabled = full_text_search_enabled if self._create_container: if ( self._indexing_policy["vectorIndexes"] is None or len(self._indexing_policy["vectorIndexes"]) == 0 ): raise ValueError( "vectorIndexes cannot be null or empty in the indexing_policy." ) if ( self._vector_embedding_policy is None or len(vector_embedding_policy["vectorEmbeddings"]) == 0 ): raise ValueError( "vectorEmbeddings cannot be null " "or empty in the vector_embedding_policy." ) if self._cosmos_container_properties["partition_key"] is None: raise ValueError( "partition_key cannot be null or empty for a container." ) if self._full_text_search_enabled: if ( self._indexing_policy["fullTextIndexes"] is None or len(self._indexing_policy["fullTextIndexes"]) == 0 ): raise ValueError( "fullTextIndexes cannot be null or empty in the " "indexing_policy if full text search is enabled." ) if ( self._full_text_policy is None or len(self._full_text_policy["fullTextPaths"]) == 0 ): raise ValueError( "fullTextPaths cannot be null or empty in the " "full_text_policy if full text search is enabled." ) # Create the database if it already doesn't exist self._database = self._cosmos_client.create_database_if_not_exists( id=self._database_name, offer_throughput=self._cosmos_database_properties.get("offer_throughput"), session_token=self._cosmos_database_properties.get("session_token"), initial_headers=self._cosmos_database_properties.get("initial_headers"), etag=self._cosmos_database_properties.get("etag"), match_condition=self._cosmos_database_properties.get("match_condition"), ) # Create the collection if it already doesn't exist self._container = self._database.create_container_if_not_exists( id=self._container_name, partition_key=self._cosmos_container_properties["partition_key"], indexing_policy=self._indexing_policy, default_ttl=self._cosmos_container_properties.get("default_ttl"), offer_throughput=self._cosmos_container_properties.get("offer_throughput"), unique_key_policy=self._cosmos_container_properties.get( "unique_key_policy" ), conflict_resolution_policy=self._cosmos_container_properties.get( "conflict_resolution_policy" ), analytical_storage_ttl=self._cosmos_container_properties.get( "analytical_storage_ttl" ), computed_properties=self._cosmos_container_properties.get( "computed_properties" ), etag=self._cosmos_container_properties.get("etag"), match_condition=self._cosmos_container_properties.get("match_condition"), session_token=self._cosmos_container_properties.get("session_token"), initial_headers=self._cosmos_container_properties.get("initial_headers"), vector_embedding_policy=self._vector_embedding_policy, full_text_policy=self._full_text_policy, )
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. Returns: List of ids from adding the texts into the vectorstore. """ _metadatas = list(metadatas if metadatas is not None else ({} for _ in texts)) return self._insert_texts(list(texts), _metadatas)
def _insert_texts( self, texts: List[str], metadatas: List[Dict[str, Any]] ) -> List[str]: """Used to Load Documents into the collection Args: texts: The list of documents strings to load metadatas: The list of metadata objects associated with each document Returns: List of ids from adding the texts into the vectorstore. """ # If the texts is empty, throw an error if not texts: raise Exception("Texts can not be null or empty") # Embed and create the documents embeddings = self._embedding.embed_documents(texts) text_key = "text" to_insert = [ { "id": str(uuid.uuid4()), text_key: t, self._embedding_key: embedding, "metadata": m, } for t, m, embedding in zip(texts, metadatas, embeddings) ] # insert the documents in CosmosDB No Sql doc_ids: List[str] = [] for item in to_insert: created_doc = self._container.create_item(item) doc_ids.append(created_doc["id"]) return doc_ids @classmethod def _from_kwargs( cls, embedding: Embeddings, *, cosmos_client: CosmosClient, vector_embedding_policy: Dict[str, Any], indexing_policy: Dict[str, Any], cosmos_container_properties: Dict[str, Any], cosmos_database_properties: Dict[str, Any], full_text_policy: Optional[Dict[str, Any]] = None, database_name: str = "vectorSearchDB", container_name: str = "vectorSearchContainer", text_key: str = "text", embedding_key: str = "embedding", metadata_key: str = "metadata", create_container: bool = True, full_text_search_enabled: bool = False, **kwargs: Any, ) -> AzureCosmosDBNoSqlVectorSearch: if kwargs: warnings.warn( "Method 'from_texts' of AzureCosmosDBNoSql vector " "store invoked with " f"unsupported arguments " f"({', '.join(sorted(kwargs))}), " "which will be ignored." ) return cls( embedding=embedding, cosmos_client=cosmos_client, vector_embedding_policy=vector_embedding_policy, full_text_policy=full_text_policy, indexing_policy=indexing_policy, cosmos_container_properties=cosmos_container_properties, cosmos_database_properties=cosmos_database_properties, database_name=database_name, container_name=container_name, text_key=text_key, embedding_key=embedding_key, metadata_key=metadata_key, create_container=create_container, full_text_search_enabled=full_text_search_enabled, )
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> AzureCosmosDBNoSqlVectorSearch: """Create an AzureCosmosDBNoSqlVectorSearch vectorstore from raw texts. Args: texts: the texts to insert. embedding: the embedding function to use in the store. metadatas: metadata dicts for the texts. **kwargs: you can pass any argument that you would to :meth:`~add_texts` and/or to the 'AstraDB' constructor (see these methods for details). These arguments will be routed to the respective methods as they are. Returns: an `AzureCosmosDBNoSqlVectorSearch` vectorstore. """ vectorstore = AzureCosmosDBNoSqlVectorSearch._from_kwargs(embedding, **kwargs) vectorstore.add_texts( texts=texts, metadatas=metadatas, ) return vectorstore
[docs] @classmethod def from_connection_string_and_aad( cls, connection_string: str, defaultAzureCredential: DefaultAzureCredential, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> AzureCosmosDBNoSqlVectorSearch: cosmos_client = CosmosClient( connection_string, defaultAzureCredential, user_agent=USER_AGENT ) kwargs["cosmos_client"] = cosmos_client vectorstore = cls._from_kwargs(embedding, **kwargs) vectorstore.add_texts( texts=texts, metadatas=metadatas, ) return vectorstore
[docs] @classmethod def from_connection_string_and_key( cls, connection_string: str, key: str, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> AzureCosmosDBNoSqlVectorSearch: cosmos_client = CosmosClient(connection_string, key, user_agent=USER_AGENT) kwargs["cosmos_client"] = cosmos_client vectorstore = cls._from_kwargs(embedding, **kwargs) vectorstore.add_texts( texts=texts, metadatas=metadatas, ) return vectorstore
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: if ids is None: raise ValueError("No document ids provided to delete.") for document_id in ids: self._container.delete_item(document_id) return True
[docs] def delete_document_by_id(self, document_id: Optional[str] = None) -> None: """Removes a Specific Document by id Args: document_id: The document identifier """ if document_id is None: raise ValueError("No document ids provided to delete.") self._container.delete_item(document_id, partition_key=document_id)
def _similarity_search_with_score( self, query_type: CosmosDBQueryType, embeddings: List[float], k: int = 4, pre_filter: Optional[PreFilter] = None, with_embedding: bool = False, offset_limit: Optional[str] = None, *, projection_mapping: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: query, parameters = self._construct_query( k=k, query_type=query_type, embeddings=embeddings, pre_filter=pre_filter, offset_limit=offset_limit, projection_mapping=projection_mapping, ) return self._execute_query( query=query, query_type=query_type, parameters=parameters, with_embedding=with_embedding, projection_mapping=projection_mapping, ) def _full_text_search( self, query_type: CosmosDBQueryType, search_text: Optional[str] = None, k: int = 4, pre_filter: Optional[PreFilter] = None, offset_limit: Optional[str] = None, *, projection_mapping: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: query, parameters = self._construct_query( k=k, query_type=query_type, search_text=search_text, pre_filter=pre_filter, offset_limit=offset_limit, projection_mapping=projection_mapping, ) return self._execute_query( query=query, query_type=query_type, parameters=parameters, with_embedding=False, projection_mapping=projection_mapping, ) def _hybrid_search_with_score( self, query_type: CosmosDBQueryType, embeddings: List[float], search_text: str, k: int = 4, pre_filter: Optional[PreFilter] = None, with_embedding: bool = False, offset_limit: Optional[str] = None, *, projection_mapping: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: query, parameters = self._construct_query( k=k, query_type=query_type, embeddings=embeddings, search_text=search_text, pre_filter=pre_filter, offset_limit=offset_limit, projection_mapping=projection_mapping, ) return self._execute_query( query=query, query_type=query_type, parameters=parameters, with_embedding=with_embedding, projection_mapping=projection_mapping, )
[docs] def similarity_search_with_score( self, query: str, k: int = 4, pre_filter: Optional[PreFilter] = None, with_embedding: bool = False, query_type: CosmosDBQueryType = CosmosDBQueryType.VECTOR, offset_limit: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: embeddings = self._embedding.embed_query(query) docs_and_scores = [] if query_type == CosmosDBQueryType.VECTOR: docs_and_scores = self._similarity_search_with_score( query_type=query_type, embeddings=embeddings, k=k, pre_filter=pre_filter, with_embedding=with_embedding, offset_limit=offset_limit, **kwargs, ) elif query_type == CosmosDBQueryType.FULL_TEXT_SEARCH: docs_and_scores = self._full_text_search( k=k, query_type=query_type, pre_filter=pre_filter, offset_limit=offset_limit, **kwargs, ) elif query_type == CosmosDBQueryType.FULL_TEXT_RANK: docs_and_scores = self._full_text_search( search_text=query, k=k, query_type=query_type, pre_filter=pre_filter, offset_limit=offset_limit, **kwargs, ) elif query_type == CosmosDBQueryType.HYBRID: docs_and_scores = self._hybrid_search_with_score( query_type=query_type, embeddings=embeddings, search_text=query, k=k, pre_filter=pre_filter, with_embedding=with_embedding, offset_limit=offset_limit, **kwargs, ) return docs_and_scores
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, query_type: CosmosDBQueryType = CosmosDBQueryType.VECTOR, pre_filter: Optional[PreFilter] = None, with_embedding: bool = False, **kwargs: Any, ) -> List[Document]: # Retrieves the docs with similarity scores # if kwargs["pre_filter"]: # pre_filter = kwargs["pre_filter"] # if kwargs["with_embedding"]: # with_embedding = kwargs["with_embedding"] docs = self._similarity_search_with_score( embeddings=embedding, k=fetch_k, query_type=query_type, pre_filter=pre_filter, with_embedding=with_embedding, ) # Re-ranks the docs using MMR mmr_doc_indexes = maximal_marginal_relevance( np.array(embedding), [doc.metadata[self._embedding_key] for doc, _ in docs], k=k, lambda_mult=lambda_mult, ) mmr_docs = [docs[i][0] for i in mmr_doc_indexes] return mmr_docs
def _construct_query( self, k: int, query_type: CosmosDBQueryType, embeddings: Optional[List[float]] = None, search_text: Optional[str] = None, pre_filter: Optional[PreFilter] = None, offset_limit: Optional[str] = None, projection_mapping: Optional[Dict[str, Any]] = None, ) -> Tuple[str, List[Dict[str, Any]]]: if ( query_type == CosmosDBQueryType.FULL_TEXT_RANK or query_type == CosmosDBQueryType.HYBRID ): query = f"SELECT {'TOP ' + str(k) + ' ' if not offset_limit else ''}" else: query = f"""SELECT {'TOP @limit ' if not offset_limit else ''}""" query += self._generate_projection_fields( projection_mapping, query_type, embeddings ) query += " FROM c " # Add where_clause if specified if pre_filter: where_clause = self._build_where_clause(pre_filter) query += f"""{where_clause}""" # TODO: Update the code to use parameters once parametrized queries # are allowed for these query functions if query_type == CosmosDBQueryType.FULL_TEXT_RANK: if search_text is None: raise ValueError( "search text cannot be None for FULL_TEXT_RANK queries." ) query += f""" ORDER BY RANK FullTextScore(c.{self._text_key}, [{", ".join(f"'{term}'" for term in search_text.split())}])""" elif query_type == CosmosDBQueryType.VECTOR: query += " ORDER BY VectorDistance(c[@embeddingKey], @embeddings)" elif query_type == CosmosDBQueryType.HYBRID: if search_text is None: raise ValueError("search text cannot be None for HYBRID queries.") query += f""" ORDER BY RANK RRF(FullTextScore(c.{self._text_key}, [{", ".join(f"'{term}'" for term in search_text.split())}]), VectorDistance(c.{self._embedding_key}, {embeddings}))""" else: query += "" # Add limit_offset_clause if specified if offset_limit is not None: query += f""" {offset_limit}""" # TODO: Remove this if check once parametrized queries # are allowed for these query functions parameters = [] if ( query_type == CosmosDBQueryType.FULL_TEXT_SEARCH or query_type == CosmosDBQueryType.VECTOR ): parameters = self._build_parameters( k=k, query_type=query_type, embeddings=embeddings, projection_mapping=projection_mapping, ) return query, parameters def _generate_projection_fields( self, projection_mapping: Optional[Dict[str, Any]], query_type: CosmosDBQueryType, embeddings: Optional[List[float]] = None, ) -> str: # TODO: Remove this if check once parametrized queries # are allowed for these query functions if ( query_type == CosmosDBQueryType.FULL_TEXT_RANK or query_type == CosmosDBQueryType.HYBRID ): if projection_mapping: projection = ", ".join( f"c.{key} as {alias}" for key, alias in projection_mapping.items() ) else: projection = ( f"c.id, c.{self._text_key} as text, " f"c.{self._metadata_key} as metadata" ) if query_type == CosmosDBQueryType.HYBRID: projection += ( f", c.{self._embedding_key} as embedding, " f"VectorDistance(c.{self._embedding_key}, " f"{embeddings}) as SimilarityScore" ) else: if projection_mapping: projection = ", ".join( f"c.[@{key}] as {alias}" for key, alias in projection_mapping.items() ) else: projection = "c.id, c[@textKey] as text, c[@metadataKey] as metadata" if ( query_type == CosmosDBQueryType.VECTOR or query_type == CosmosDBQueryType.HYBRID ): projection += ( ", c[@embeddingKey] as embedding, " "VectorDistance(c[@embeddingKey], " "@embeddings) as SimilarityScore" ) return projection def _build_parameters( self, k: int, query_type: CosmosDBQueryType, embeddings: Optional[List[float]], search_terms: Optional[List[str]] = None, projection_mapping: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: parameters: List[Dict[str, Any]] = [ {"name": "@limit", "value": k}, {"name": "@textKey", "value": self._text_key}, ] if projection_mapping: for key in projection_mapping.keys(): parameters.append({"name": f"@{key}", "value": key}) else: parameters.append({"name": "@metadataKey", "value": self._metadata_key}) if ( query_type == CosmosDBQueryType.FULL_TEXT_RANK or query_type == CosmosDBQueryType.HYBRID ): parameters.append({"name": "@searchTerms", "value": search_terms}) elif ( query_type == CosmosDBQueryType.VECTOR or query_type == CosmosDBQueryType.HYBRID ): parameters.append({"name": "@embeddingKey", "value": self._embedding_key}) parameters.append({"name": "@embeddings", "value": embeddings}) return parameters def _build_where_clause(self, pre_filter: PreFilter) -> str: """ Builds a where clause based on the given pre_filter. """ operator_map = self._where_clause_operator_map() if ( pre_filter.logical_operator and pre_filter.logical_operator not in operator_map ): raise ValueError( f"unsupported logical_operator: {pre_filter.logical_operator}" ) sql_logical_operator = operator_map.get(pre_filter.logical_operator or "", "") clauses = [] for condition in pre_filter.conditions: if condition.operator not in operator_map: raise ValueError(f"Unsupported operator: {condition.operator}") if "full_text" in condition.operator: if not isinstance(condition.value, str): raise ValueError( f"Expected a string for {condition.operator}, " f"got {type(condition.value)}" ) search_terms = ", ".join( f"'{term}'" for term in condition.value.split() ) sql_function = operator_map[condition.operator] clauses.append( f"{sql_function}(c.{condition.property}, {search_terms})" ) else: sql_operator = operator_map[condition.operator] if isinstance(condition.value, str): value = f"'{condition.value}'" elif isinstance(condition.value, list): # e.g., for IN clauses value = f"({', '.join(map(str, condition.value))})" clauses.append(f"c.{condition.property} {sql_operator} {value}") return f""" WHERE {' {} '.format(sql_logical_operator).join(clauses)}""".strip() def _execute_query( self, query: str, query_type: CosmosDBQueryType, parameters: List[Dict[str, Any]], with_embedding: bool, projection_mapping: Optional[Dict[str, Any]], ) -> List[Tuple[Document, float]]: docs_and_scores = [] items = list( self._container.query_items( query=query, parameters=parameters, enable_cross_partition_query=True ) ) for item in items: text = item[self._text_key] metadata = item.pop(self._metadata_key, {}) score = 0.0 if projection_mapping: for key, alias in projection_mapping.items(): if key == self._text_key: continue metadata[alias] = item[alias] else: metadata["id"] = item["id"] if ( query_type == CosmosDBQueryType.VECTOR or query_type == CosmosDBQueryType.HYBRID ): score = item["SimilarityScore"] if with_embedding: metadata[self._embedding_key] = item[self._embedding_key] docs_and_scores.append( (Document(page_content=text, metadata=metadata), score) ) return docs_and_scores def _where_clause_operator_map(self) -> Dict[str, str]: operator_map = { "$eq": "=", "$ne": "!=", "$lt": "<", "$lte": "<=", "$gt": ">", "$gte": ">=", "$add": "+", "$sub": "-", "$mul": "*", "$div": "/", "$mod": "%", "$or": "OR", "$and": "AND", "$not": "NOT", "$concat": "||", "$bit_or": "|", "$bit_and": "&", "$bit_xor": "^", "$bit_lshift": "<<", "$bit_rshift": ">>", "$bit_zerofill_rshift": ">>>", "$full_text_contains": "FullTextContains", "$full_text_contains_all": "FullTextContainsAll", "$full_text_contains_any": "FullTextContainsAny", } return operator_map
[docs] def get_container(self) -> ContainerProxy: return self._container