"""Astra DB vector store integration."""from__future__importannotationsimportasyncioimportinspectimportloggingimportuuidimportwarningsfromconcurrent.futuresimportThreadPoolExecutorfromtypingimport(TYPE_CHECKING,Any,AsyncIterable,Awaitable,Callable,Dict,Iterable,Literal,NamedTuple,Sequence,TypeVar,cast,overload,)importnumpyasnpfromastrapy.constantsimportEnvironmentfromastrapy.exceptionsimportInsertManyExceptionfromastrapy.infoimportCollectionVectorServiceOptionsfromlangchain_community.vectorstores.utilsimportmaximal_marginal_relevancefromlangchain_core.documentsimportDocumentfromlangchain_core.runnables.utilsimportgather_with_concurrencyfromlangchain_core.vectorstoresimportVectorStorefromtyping_extensionsimportoverridefromlangchain_astradb.utils.astradbimport(COMPONENT_NAME_VECTORSTORE,DEFAULT_DOCUMENT_CHUNK_SIZE,MAX_CONCURRENT_DOCUMENT_DELETIONS,MAX_CONCURRENT_DOCUMENT_INSERTIONS,MAX_CONCURRENT_DOCUMENT_REPLACEMENTS,SetupMode,_AstraDBCollectionEnvironment,_survey_collection,)fromlangchain_astradb.utils.vector_store_autodetectimport(_detect_document_codec,)fromlangchain_astradb.utils.vector_store_codecsimport(VECTORIZE_FIELD_NAME,_AstraDBVectorStoreDocumentCodec,_DefaultVectorizeVSDocumentCodec,_DefaultVSDocumentCodec,)ifTYPE_CHECKING:fromastrapy.authenticationimportEmbeddingHeadersProvider,TokenProviderfromastrapy.dbimport(AstraDBasAstraDBClient,)fromastrapy.dbimport(AsyncAstraDBasAsyncAstraDBClient,)fromastrapy.resultsimportUpdateResultfromlangchain_core.embeddingsimportEmbeddingsT=TypeVar("T")U=TypeVar("U")DocDict=Dict[str,Any]# dicts expressing entries to insert
[docs]classAstraDBQueryResult(NamedTuple):"""The complete information contained in a vector store entry. This class represents all that can be returned from the collection when running a query, which goes beyond just the corresponding Document. """document:Documentid:strembedding:list[float]|Nonesimilarity:float|None
# error code to check for during bulk insertionsDOCUMENT_ALREADY_EXISTS_API_ERROR_CODE="DOCUMENT_ALREADY_EXISTS"# max number of errors shown in full insertion error messagesMAX_SHOWN_INSERTION_ERRORS=8logger=logging.getLogger(__name__)def_unique_list(lst:list[T],key:Callable[[T],U])->list[T]:visited_keys:set[U]=set()new_lst=[]foriteminlst:item_key=key(item)ifitem_keynotinvisited_keys:visited_keys.add(item_key)new_lst.append(item)returnnew_lstdef_normalize_content_field(content_field:str|None,*,is_autodetect:bool,has_vectorize:bool,)->str:ifhas_vectorize:ifcontent_fieldisnotNone:msg="content_field is not configurable for vectorize collections."raiseValueError(msg)returnVECTORIZE_FIELD_NAMEifcontent_fieldisNone:return"*"ifis_autodetectelse"content"ifcontent_field=="*":ifnotis_autodetect:msg="content_field='*' illegal if autodetect_collection is False."raiseValueError(msg)returncontent_fieldreturncontent_fielddef_validate_autodetect_init_params(*,metric:str|None=None,setup_mode:SetupMode|None,pre_delete_collection:bool,metadata_indexing_include:Iterable[str]|None,metadata_indexing_exclude:Iterable[str]|None,collection_indexing_policy:dict[str,Any]|None,collection_vector_service_options:CollectionVectorServiceOptions|None,)->None:"""Check that the passed parameters do not violate the autodetect constraints."""forbidden_parameters=[p_nameforp_name,p_valuein(("metric",metric),("metadata_indexing_include",metadata_indexing_include),("metadata_indexing_exclude",metadata_indexing_exclude),("collection_indexing_policy",collection_indexing_policy),("collection_vector_service_options",collection_vector_service_options),)ifp_valueisnotNone]fp_error:str|None=Noneifforbidden_parameters:fp_error=(f"Parameter(s) {', '.join(forbidden_parameters)}. were provided ""but cannot be passed.")pd_error:str|None=Noneifpre_delete_collection:pd_error="Parameter `pre_delete_collection` cannot be True."sm_error:str|None=Noneifsetup_modeisnotNone:sm_error="Parameter `setup_mode` not allowed."am_errors=[err_sforerr_sin(fp_error,pd_error,sm_error)iferr_sisnotNone]ifam_errors:msg=f"Invalid parameters for autodetect mode: {'; '.join(am_errors)}"raiseValueError(msg)def_insertmany_error_message(err:InsertManyException)->str:"""Format an astrapy insert exception into an error message. This utility prepares a detailed message from an astrapy InsertManyException, to be used in raising an exception within a vectorstore multiple insertion. This operation must filter out duplicate-id specific errors (which the vector store could actually handle, if they were the only ondes). """err_msg="Cannot insert documents. The Data API returned the following error(s): "filtered_error_descs=[edescforedescinerr.error_descriptorsifedesc.error_code!=DOCUMENT_ALREADY_EXISTS_API_ERROR_CODEifedesc.message]err_msg+="; ".join(edesc.messageor""foredescinfiltered_error_descs[:MAX_SHOWN_INSERTION_ERRORS])if(num_residual:=len(filtered_error_descs)-MAX_SHOWN_INSERTION_ERRORS)>0:err_msg+=f". (Note: {num_residual} further errors omitted.)"err_msg+=(" (Full API error in '<this-exception>.__cause__.error_descriptors'"f": ignore '{DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE}'.)")returnerr_msg
[docs]classAstraDBVectorStoreError(Exception):"""An exception during vector-store activities. This exception represents any operational exception occurring while performing an action within an AstraDBVectorStore. """
[docs]classAstraDBVectorStore(VectorStore):"""AstraDB vector store integration. Setup: Install the ``langchain-astradb`` package and head to the `AstraDB website <https://astra.datastax.com>`_, create an account, create a new database and `create an application token <https://docs.datastax.com/en/astra-db-serverless/administration/manage-application-tokens.html>`_. .. code-block:: bash pip install -qU langchain-astradb Key init args — indexing params: collection_name: str Name of the collection. embedding: Embeddings Embedding function to use. Key init args — client params: api_endpoint: str AstraDB API endpoint. token: str API token for Astra DB usage. namespace: Optional[str] Namespace (aka keyspace) where the collection is created Instantiate: Get your API endpoint and application token from the dashboard of your database. .. code-block:: python import getpass from langchain_astradb import AstraDBVectorStore from langchain_openai import OpenAIEmbeddings ASTRA_DB_API_ENDPOINT = getpass.getpass("ASTRA_DB_API_ENDPOINT = ") ASTRA_DB_APPLICATION_TOKEN = getpass.getpass("ASTRA_DB_APPLICATION_TOKEN = ") vector_store = AstraDBVectorStore( collection_name="astra_vector_langchain", embedding=OpenAIEmbeddings(), api_endpoint=ASTRA_DB_API_ENDPOINT, token=ASTRA_DB_APPLICATION_TOKEN, ) Have the vector store figure out its configuration (documents scheme on DB) from an existing collection, in the case of `server-side-embeddings <https://docs.datastax.com/en/astra-db-serverless/databases/embedding-generation.html>`_: .. code-block:: python import getpass from langchain_astradb import AstraDBVectorStore ASTRA_DB_API_ENDPOINT = getpass.getpass("ASTRA_DB_API_ENDPOINT = ") ASTRA_DB_APPLICATION_TOKEN = getpass.getpass("ASTRA_DB_APPLICATION_TOKEN = ") vector_store = AstraDBVectorStore( collection_name="astra_vector_langchain", api_endpoint=ASTRA_DB_API_ENDPOINT, token=ASTRA_DB_APPLICATION_TOKEN, autodetect_collection=True, ) Add Documents: .. code-block:: python from langchain_core.documents import Document document_1 = Document(page_content="foo", metadata={"baz": "bar"}) document_2 = Document(page_content="thud", metadata={"bar": "baz"}) document_3 = Document(page_content="i will be deleted :(") documents = [document_1, document_2, document_3] ids = ["1", "2", "3"] vector_store.add_documents(documents=documents, ids=ids) Delete Documents: .. code-block:: python vector_store.delete(ids=["3"]) Search: .. code-block:: python results = vector_store.similarity_search(query="thud",k=1) for doc in results: print(f"* {doc.page_content} [{doc.metadata}]") .. code-block:: none thud [{'bar': 'baz'}] Search with filter: .. code-block:: python results = vector_store.similarity_search(query="thud",k=1,filter={"bar": "baz"}) for doc in results: print(f"* {doc.page_content} [{doc.metadata}]") .. code-block:: none thud [{'bar': 'baz'}] Search with score: .. code-block:: python results = vector_store.similarity_search_with_score(query="qux",k=1) for doc, score in results: print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") .. code-block:: none [SIM=0.916135] foo [{'baz': 'bar'}] Async: .. code-block:: python # add documents await vector_store.aadd_documents(documents=documents, ids=ids) # delete documents await vector_store.adelete(ids=["3"]) # search results = vector_store.asimilarity_search(query="thud",k=1) # search with score results = await vector_store.asimilarity_search_with_score(query="qux",k=1) for doc,score in results: print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") .. code-block:: none [SIM=0.916135] foo [{'baz': 'bar'}] Use as Retriever: .. code-block:: python retriever = vector_store.as_retriever( search_type="similarity_score_threshold", search_kwargs={"k": 1, "score_threshold": 0.5}, ) retriever.invoke("thud") .. code-block:: none [Document(metadata={'bar': 'baz'}, page_content='thud')] """# noqa: E501
[docs]deffilter_to_query(self,filter_dict:dict[str,Any]|None)->dict[str,Any]:"""Prepare a query for use on DB based on metadata filter. Encode an "abstract" filter clause on metadata into a query filter condition aware of the collection schema choice. Args: filter_dict: a metadata condition in the form {"field": "value"} or related. Returns: the corresponding mapping ready for use in queries, aware of the details of the schema used to encode the document on DB. """iffilter_dictisNone:return{}returnself.document_codec.encode_filter(filter_dict)
@staticmethoddef_normalize_metadata_indexing_policy(metadata_indexing_include:Iterable[str]|None,metadata_indexing_exclude:Iterable[str]|None,collection_indexing_policy:dict[str,Any]|None,document_codec:_AstraDBVectorStoreDocumentCodec,)->dict[str,Any]:"""Normalize the constructor indexing parameters. Validate the constructor indexing parameters and normalize them into a ready-to-use dict for the 'options' when creating a collection. """params=[metadata_indexing_include,metadata_indexing_exclude,collection_indexing_policy,]ifparams.count(None)<len(params)-1:msg=("At most one of the parameters `metadata_indexing_include`,"" `metadata_indexing_exclude` and `collection_indexing_policy`"" can be specified as non null.")raiseValueError(msg)ifmetadata_indexing_includeisnotNone:return{"allow":[document_codec.metadata_key_to_field_identifier(md_field)formd_fieldinmetadata_indexing_include]}ifmetadata_indexing_excludeisnotNone:return{"deny":[document_codec.metadata_key_to_field_identifier(md_field)formd_fieldinmetadata_indexing_exclude]}return(collection_indexing_policyifcollection_indexing_policyisnotNoneelsedocument_codec.default_collection_indexing_policy)
[docs]def__init__(self,*,collection_name:str,embedding:Embeddings|None=None,token:str|TokenProvider|None=None,api_endpoint:str|None=None,environment:str|None=None,namespace:str|None=None,metric:str|None=None,batch_size:int|None=None,bulk_insert_batch_concurrency:int|None=None,bulk_insert_overwrite_concurrency:int|None=None,bulk_delete_concurrency:int|None=None,setup_mode:SetupMode|None=None,pre_delete_collection:bool=False,metadata_indexing_include:Iterable[str]|None=None,metadata_indexing_exclude:Iterable[str]|None=None,collection_indexing_policy:dict[str,Any]|None=None,collection_vector_service_options:CollectionVectorServiceOptions|None=None,collection_embedding_api_key:str|EmbeddingHeadersProvider|None=None,content_field:str|None=None,ignore_invalid_documents:bool=False,autodetect_collection:bool=False,ext_callers:list[tuple[str|None,str|None]|str|None]|None=None,component_name:str=COMPONENT_NAME_VECTORSTORE,astra_db_client:AstraDBClient|None=None,async_astra_db_client:AsyncAstraDBClient|None=None,)->None:"""Wrapper around DataStax Astra DB for vector-store workloads. For quickstart and details, visit https://docs.datastax.com/en/astra-db-serverless/index.html Args: embedding: the embeddings function or service to use. This enables client-side embedding functions or calls to external embedding providers. If ``embedding`` is provided, arguments ``collection_vector_service_options`` and ``collection_embedding_api_key`` cannot be provided. collection_name: name of the Astra DB collection to create/use. token: API token for Astra DB usage, either in the form of a string or a subclass of ``astrapy.authentication.TokenProvider``. If not provided, the environment variable ASTRA_DB_APPLICATION_TOKEN is inspected. api_endpoint: full URL to the API endpoint, such as ``https://<DB-ID>-us-east1.apps.astra.datastax.com``. If not provided, the environment variable ASTRA_DB_API_ENDPOINT is inspected. environment: a string specifying the environment of the target Data API. If omitted, defaults to "prod" (Astra DB production). Other values are in ``astrapy.constants.Environment`` enum class. namespace: namespace (aka keyspace) where the collection is created. If not provided, the environment variable ASTRA_DB_KEYSPACE is inspected. Defaults to the database's "default namespace". metric: similarity function to use out of those available in Astra DB. If left out, it will use Astra DB API's defaults (i.e. "cosine" - but, for performance reasons, "dot_product" is suggested if embeddings are normalized to one). batch_size: Size of document chunks for each individual insertion API request. If not provided, astrapy defaults are applied. bulk_insert_batch_concurrency: Number of threads or coroutines to insert batches concurrently. bulk_insert_overwrite_concurrency: Number of threads or coroutines in a batch to insert pre-existing entries. bulk_delete_concurrency: Number of threads or coroutines for multiple-entry deletes. setup_mode: mode used to create the collection (SYNC, ASYNC or OFF). pre_delete_collection: whether to delete the collection before creating it. If False and the collection already exists, the collection will be used as is. metadata_indexing_include: an allowlist of the specific metadata subfields that should be indexed for later filtering in searches. metadata_indexing_exclude: a denylist of the specific metadata subfields that should not be indexed for later filtering in searches. collection_indexing_policy: a full "indexing" specification for what fields should be indexed for later filtering in searches. This dict must conform to to the API specifications (see https://docs.datastax.com/en/astra-db-serverless/api-reference/collections.html#the-indexing-option) collection_vector_service_options: specifies the use of server-side embeddings within Astra DB. If passing this parameter, ``embedding`` cannot be provided. collection_embedding_api_key: for usage of server-side embeddings within Astra DB. With this parameter one can supply an API Key that will be passed to Astra DB with each data request. This parameter can be either a string or a subclass of ``astrapy.authentication.EmbeddingHeadersProvider``. This is useful when the service is configured for the collection, but no corresponding secret is stored within Astra's key management system. content_field: name of the field containing the textual content in the documents when saved on Astra DB. For vectorize collections, this cannot be specified; for non-vectorize collection, defaults to "content". The special value "*" can be passed only if autodetect_collection=True. In this case, the actual name of the key for the textual content is guessed by inspection of a few documents from the collection, under the assumption that the longer strings are the most likely candidates. Please understand the limitations of this method and get some understanding of your data before passing ``"*"`` for this parameter. ignore_invalid_documents: if False (default), exceptions are raised when a document is found on the Astra DB collection that does not have the expected shape. If set to True, such results from the database are ignored and a warning is issued. Note that in this case a similarity search may end up returning fewer results than the required ``k``. autodetect_collection: if True, turns on autodetect behavior. The store will look for an existing collection of the provided name and infer the store settings from it. Default is False. In autodetect mode, ``content_field`` can be given as ``"*"``, meaning that an attempt will be made to determine it by inspection (unless vectorize is enabled, in which case ``content_field`` is ignored). In autodetect mode, the store not only determines whether embeddings are client- or server-side, but - most importantly - switches automatically between "nested" and "flat" representations of documents on DB (i.e. having the metadata key-value pairs grouped in a ``metadata`` field or spread at the documents' top-level). The former scheme is the native mode of the AstraDBVectorStore; the store resorts to the latter in case of vector collections populated with external means (such as a third-party data import tool) before applying an AstraDBVectorStore to them. Note that the following parameters cannot be used if this is True: ``metric``, ``setup_mode``, ``metadata_indexing_include``, ``metadata_indexing_exclude``, ``collection_indexing_policy``, ``collection_vector_service_options``. ext_callers: one or more caller identities to identify Data API calls in the User-Agent header. This is a list of (name, version) pairs, or just strings if no version info is provided, which, if supplied, becomes the leading part of the User-Agent string in all API requests related to this component. component_name: the string identifying this specific component in the stack of usage info passed as the User-Agent string to the Data API. Defaults to "langchain_vectorstore", but can be overridden if this component actually serves as the building block for another component (such as a Graph Vector Store). astra_db_client: *DEPRECATED starting from version 0.3.5.* *Please use 'token', 'api_endpoint' and optionally 'environment'.* you can pass an already-created 'astrapy.db.AstraDB' instance (alternatively to 'token', 'api_endpoint' and 'environment'). async_astra_db_client: *DEPRECATED starting from version 0.3.5.* *Please use 'token', 'api_endpoint' and optionally 'environment'.* you can pass an already-created 'astrapy.db.AsyncAstraDB' instance (alternatively to 'token', 'api_endpoint' and 'environment'). Note: For concurrency in synchronous :meth:`~add_texts`:, as a rule of thumb, on a typical client machine it is suggested to keep the quantity bulk_insert_batch_concurrency * bulk_insert_overwrite_concurrency much below 1000 to avoid exhausting the client multithreading/networking resources. The hardcoded defaults are somewhat conservative to meet most machines' specs, but a sensible choice to test may be: - bulk_insert_batch_concurrency = 80 - bulk_insert_overwrite_concurrency = 10 A bit of experimentation is required to nail the best results here, depending on both the machine/network specs and the expected workload (specifically, how often a write is an update of an existing id). Remember you can pass concurrency settings to individual calls to :meth:`~add_texts` and :meth:`~add_documents` as well. """# general collection settingsself.collection_name=collection_nameself.token=tokenself.api_endpoint=api_endpointself.environment=environmentself.namespace=namespaceself.indexing_policy:dict[str,Any]self.autodetect_collection=autodetect_collection# vector-related settingsself.embedding_dimension:int|None=Noneself.embedding=embeddingself.metric=metricself.collection_embedding_api_key=collection_embedding_api_keyself.collection_vector_service_options=collection_vector_service_options# DB-encoding settings:self.document_codec:_AstraDBVectorStoreDocumentCodec# concurrency settingsself.batch_size:int|None=batch_sizeorDEFAULT_DOCUMENT_CHUNK_SIZEself.bulk_insert_batch_concurrency:int=(bulk_insert_batch_concurrencyorMAX_CONCURRENT_DOCUMENT_INSERTIONS)self.bulk_insert_overwrite_concurrency:int=(bulk_insert_overwrite_concurrencyorMAX_CONCURRENT_DOCUMENT_REPLACEMENTS)self.bulk_delete_concurrency:int=(bulk_delete_concurrencyorMAX_CONCURRENT_DOCUMENT_DELETIONS)_setup_mode:SetupMode_embedding_dimension:int|Awaitable[int]|Noneifnotself.autodetect_collection:logger.info("vector store default init, collection '%s'",self.collection_name)_setup_mode=SetupMode.SYNCifsetup_modeisNoneelsesetup_mode_embedding_dimension=self._prepare_embedding_dimension(_setup_mode)# determine vectorize/nonvectorizehas_vectorize=self.collection_vector_service_optionsisnotNone_content_field=_normalize_content_field(content_field,is_autodetect=False,has_vectorize=has_vectorize,)ifself.collection_vector_service_optionsisnotNone:self.document_codec=_DefaultVectorizeVSDocumentCodec(ignore_invalid_documents=ignore_invalid_documents,)else:self.document_codec=_DefaultVSDocumentCodec(content_field=_content_field,ignore_invalid_documents=ignore_invalid_documents,)# indexing policy settingself.indexing_policy=self._normalize_metadata_indexing_policy(metadata_indexing_include=metadata_indexing_include,metadata_indexing_exclude=metadata_indexing_exclude,collection_indexing_policy=collection_indexing_policy,document_codec=self.document_codec,)else:logger.info("vector store autodetect init, collection '%s'",self.collection_name)# specific checks for autodetect logic_validate_autodetect_init_params(metric=self.metric,setup_mode=setup_mode,pre_delete_collection=pre_delete_collection,metadata_indexing_include=metadata_indexing_include,metadata_indexing_exclude=metadata_indexing_exclude,collection_indexing_policy=collection_indexing_policy,collection_vector_service_options=self.collection_vector_service_options,)_setup_mode=SetupMode.OFF# fetch collection intelligencec_descriptor,c_documents=_survey_collection(collection_name=self.collection_name,token=self.token,api_endpoint=self.api_endpoint,keyspace=self.namespace,environment=self.environment,ext_callers=ext_callers,component_name=component_name,astra_db_client=astra_db_client,async_astra_db_client=async_astra_db_client,)ifc_descriptorisNone:msg=f"Collection '{self.collection_name}' not found."raiseValueError(msg)# use the collection info to set the store propertiesifc_descriptor.options.vectorisNone:msg="Non-vector collection detected."raiseValueError(msg)_embedding_dimension=c_descriptor.options.vector.dimensionself.collection_vector_service_options=c_descriptor.options.vector.servicehas_vectorize=self.collection_vector_service_optionsisnotNonelogger.info("vector store autodetect: has_vectorize = %s",has_vectorize)norm_content_field=_normalize_content_field(content_field,is_autodetect=True,has_vectorize=has_vectorize,)self.document_codec=_detect_document_codec(c_documents,has_vectorize=has_vectorize,ignore_invalid_documents=ignore_invalid_documents,norm_content_field=norm_content_field,)self.indexing_policy=self._normalize_metadata_indexing_policy(metadata_indexing_include=None,metadata_indexing_exclude=None,collection_indexing_policy=c_descriptor.options.indexing,document_codec=self.document_codec,)# validate embedding/vectorize compatibility and such.# Embedding and the server-side embeddings are mutually exclusive,# as both specify how to produce embeddings.# Also API key makes no sense unless vectorize.ifself.embeddingisNoneandnotself.document_codec.server_side_embeddings:msg="Embedding is required for non-vectorize collections."raiseValueError(msg)ifself.embeddingisnotNoneandself.document_codec.server_side_embeddings:msg="Embedding cannot be provided for vectorize collections."raiseValueError(msg)if(notself.document_codec.server_side_embeddingsandself.collection_embedding_api_keyisnotNone):msg="Embedding API Key cannot be provided for non-vectorize collections."raiseValueError(msg)self.astra_env=_AstraDBCollectionEnvironment(collection_name=collection_name,token=self.token,api_endpoint=self.api_endpoint,keyspace=self.namespace,environment=self.environment,setup_mode=_setup_mode,pre_delete_collection=pre_delete_collection,embedding_dimension=_embedding_dimension,metric=self.metric,requested_indexing_policy=self.indexing_policy,default_indexing_policy=(self.document_codec.default_collection_indexing_policy),collection_vector_service_options=self.collection_vector_service_options,collection_embedding_api_key=self.collection_embedding_api_key,ext_callers=ext_callers,component_name=component_name,astra_db_client=astra_db_client,async_astra_db_client=async_astra_db_client,)
def_get_safe_embedding(self)->Embeddings:ifnotself.embedding:msg="Missing embedding"raiseValueError(msg)returnself.embeddingdef_prepare_embedding_dimension(self,setup_mode:SetupMode)->int|Awaitable[int]|None:"""Return the right kind of object for the astra_env to use."""ifself.embeddingisNone:returnNoneifsetup_mode==SetupMode.ASYNC:# in this case, we wrap the computation as an awaitableasyncdef_aget_embedding_dimension()->int:ifself.embedding_dimensionisNone:self.embedding_dimension=len(awaitself._get_safe_embedding().aembed_query(text="This is a sample sentence."))returnself.embedding_dimensionreturn_aget_embedding_dimension()# case of setup_mode = SetupMode.SYNC, SetupMode.OFFifself.embedding_dimensionisNone:self.embedding_dimension=len(self._get_safe_embedding().embed_query(text="This is a sample sentence."))returnself.embedding_dimension@property@overridedefembeddings(self)->Embeddings|None:"""Accesses the supplied embeddings object. If using server-side embeddings, this will return None. """returnself.embedding@overridedef_select_relevance_score_fn(self)->Callable[[float],float]:# The underlying API calls already returns a "score proper",# i.e. one in [0, 1] where higher means more *similar*,# so here the final score transformation is not reversing the interval.returnlambdascore:score
[docs]defcopy(self,*,token:str|TokenProvider|None=None,ext_callers:list[tuple[str|None,str|None]|str|None]|None=None,component_name:str|None=None,collection_embedding_api_key:str|EmbeddingHeadersProvider|None=None,)->AstraDBVectorStore:"""Create a copy, possibly with changed attributes. This method creates a shallow copy of this environment. If a parameter is passed and differs from None, it will replace the corresponding value in the copy. The method allows changing only the parameters that ensure the copy is functional and does not trigger side-effects: for example, one cannot create a copy acting on a new collection. In those cases, one should create a new instance of ``AstraDBVectorStore`` from scratch. Attributes: token: API token for Astra DB usage, either in the form of a string or a subclass of ``astrapy.authentication.TokenProvider``. In order to suppress token usage in the copy, explicitly pass ``astrapy.authentication.StaticTokenProvider(None)``. ext_callers: additional custom (caller_name, caller_version) pairs to attach to the User-Agent header when issuing Data API requests. component_name: a value for the LangChain component name to use when identifying the originator of the Data API requests. collection_embedding_api_key: the API Key to supply in each Data API request if necessary. This is necessary if using the Vectorize feature and no secret is stored with the database. In order to suppress the API Key in the copy, explicitly pass ``astrapy.authentication.EmbeddingAPIKeyHeaderProvider(None)``. """copy=AstraDBVectorStore(collection_name="moot",api_endpoint="http://moot",environment=Environment.OTHER,namespace="moot",setup_mode=SetupMode.OFF,collection_vector_service_options=CollectionVectorServiceOptions(provider="moot",model_name="moot",),)copy.collection_name=self.collection_namecopy.token=self.tokeniftokenisNoneelsetokencopy.api_endpoint=self.api_endpointcopy.environment=self.environmentcopy.namespace=self.namespacecopy.indexing_policy=self.indexing_policycopy.autodetect_collection=self.autodetect_collectioncopy.embedding_dimension=self.embedding_dimensioncopy.embedding=self.embeddingcopy.metric=self.metriccopy.collection_embedding_api_key=(self.collection_embedding_api_keyifcollection_embedding_api_keyisNoneelsecollection_embedding_api_key)copy.collection_vector_service_options=self.collection_vector_service_optionscopy.document_codec=self.document_codeccopy.batch_size=self.batch_sizecopy.bulk_insert_batch_concurrency=self.bulk_insert_batch_concurrencycopy.bulk_insert_overwrite_concurrency=self.bulk_insert_overwrite_concurrencycopy.bulk_delete_concurrency=self.bulk_delete_concurrency# Now the .astra_env attribute:copy.astra_env=self.astra_env.copy(token=token,ext_callers=ext_callers,component_name=component_name,collection_embedding_api_key=collection_embedding_api_key,)returncopy
[docs]defclear(self)->None:"""Empty the collection of all its stored entries."""self.astra_env.ensure_db_setup()self.astra_env.collection.delete_many({})
[docs]asyncdefaclear(self)->None:"""Empty the collection of all its stored entries."""awaitself.astra_env.aensure_db_setup()awaitself.astra_env.async_collection.delete_many({})
[docs]defdelete_by_document_id(self,document_id:str)->bool:"""Remove a single document from the store, given its document ID. Args: document_id: The document ID Returns: True if a document has indeed been deleted, False if ID not found. """self.astra_env.ensure_db_setup()# self.collection is not None (by _ensure_astra_db_client)deletion_response=self.astra_env.collection.delete_one(self.document_codec.encode_query(ids=[document_id]),)returndeletion_response.deleted_count==1
[docs]asyncdefadelete_by_document_id(self,document_id:str)->bool:"""Remove a single document from the store, given its document ID. Args: document_id: The document ID Returns: True if a document has indeed been deleted, False if ID not found. """awaitself.astra_env.aensure_db_setup()deletion_response=awaitself.astra_env.async_collection.delete_one(self.document_codec.encode_query(ids=[document_id]),)returndeletion_response.deleted_count==1
[docs]@overridedefdelete(self,ids:list[str]|None=None,concurrency:int|None=None,**kwargs:Any,)->bool|None:"""Delete by vector ids. Args: ids: List of ids to delete. concurrency: max number of threads issuing single-doc delete requests. Defaults to vector-store overall setting. **kwargs: Additional arguments are ignored. Returns: True if deletion is (entirely) successful, False otherwise. """ifkwargs:warnings.warn("Method 'delete' of AstraDBVectorStore vector store invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)ifidsisNone:msg="No ids provided to delete."raiseValueError(msg)_max_workers=concurrencyorself.bulk_delete_concurrencywithThreadPoolExecutor(max_workers=_max_workers)astpe:_=list(tpe.map(self.delete_by_document_id,ids,))returnTrue
[docs]@overrideasyncdefadelete(self,ids:list[str]|None=None,concurrency:int|None=None,**kwargs:Any,)->bool|None:"""Delete by vector ids. Args: ids: List of ids to delete. concurrency: max number of simultaneous coroutines for single-doc delete requests. Defaults to vector-store overall setting. **kwargs: Additional arguments are ignored. Returns: True if deletion is (entirely) successful, False otherwise. """ifkwargs:warnings.warn("Method 'adelete' of AstraDBVectorStore invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)ifidsisNone:msg="No ids provided to delete."raiseValueError(msg)_max_workers=concurrencyorself.bulk_delete_concurrencyawaitgather_with_concurrency(_max_workers,*[self.adelete_by_document_id(doc_id)fordoc_idinids])returnTrue
[docs]defdelete_by_metadata_filter(self,filter:dict[str,Any],# noqa: A002)->int:"""Delete all documents matching a certain metadata filtering condition. This operation does not use the vector embeddings in any way, it simply removes all documents whose metadata match the provided condition. Args: filter: Filter on the metadata to apply. The filter cannot be empty. Returns: A number expressing the amount of deleted documents. """ifnotfilter:msg=("Method `delete_by_metadata_filter` does not accept an empty ""filter. Use the `clear()` method if you really want to empty ""the vector store.")raiseValueError(msg)self.astra_env.ensure_db_setup()metadata_parameter=self.filter_to_query(filter)del_result=self.astra_env.collection.delete_many(filter=metadata_parameter,)returndel_result.deleted_countor0
[docs]asyncdefadelete_by_metadata_filter(self,filter:dict[str,Any],# noqa: A002)->int:"""Delete all documents matching a certain metadata filtering condition. This operation does not use the vector embeddings in any way, it simply removes all documents whose metadata match the provided condition. Args: filter: Filter on the metadata to apply. The filter cannot be empty. Returns: A number expressing the amount of deleted documents. """ifnotfilter:msg=("Method `adelete_by_metadata_filter` does not accept an empty ""filter. Use the `aclear()` method if you really want to empty ""the vector store.")raiseValueError(msg)awaitself.astra_env.aensure_db_setup()metadata_parameter=self.filter_to_query(filter)del_result=awaitself.astra_env.async_collection.delete_many(filter=metadata_parameter,)returndel_result.deleted_countor0
[docs]defdelete_collection(self)->None:"""Completely delete the collection from the database. Completely delete the collection from the database (as opposed to :meth:`~clear`, which empties it only). Stored data is lost and unrecoverable, resources are freed. Use with caution. """self.astra_env.ensure_db_setup()self.astra_env.collection.drop()
[docs]asyncdefadelete_collection(self)->None:"""Completely delete the collection from the database. Completely delete the collection from the database (as opposed to :meth:`~aclear`, which empties it only). Stored data is lost and unrecoverable, resources are freed. Use with caution. """awaitself.astra_env.aensure_db_setup()awaitself.astra_env.async_collection.drop()
def_get_documents_to_insert(self,texts:Iterable[str],embedding_vectors:Sequence[list[float]|None],metadatas:list[dict]|None=None,ids:list[str]|None=None,)->list[DocDict]:ifidsisNone:ids=[uuid.uuid4().hexfor_intexts]ifmetadatasisNone:metadatas=[{}for_intexts]documents_to_insert=[self.document_codec.encode(content=b_txt,document_id=b_id,vector=b_emb,metadata=b_md,)forb_txt,b_emb,b_id,b_mdinzip(texts,embedding_vectors,ids,metadatas,)]# make unique by id, keeping the lastreturn_unique_list(documents_to_insert[::-1],self.document_codec.get_id,)[::-1]
[docs]@overridedefadd_texts(self,texts:Iterable[str],metadatas:list[dict]|None=None,ids:list[str]|None=None,*,batch_size:int|None=None,batch_concurrency:int|None=None,overwrite_concurrency:int|None=None,**kwargs:Any,)->list[str]:"""Run texts through the embeddings and add them to the vectorstore. If passing explicit ids, those entries whose id is in the store already will be replaced. Args: texts: Texts to add to the vectorstore. metadatas: Optional list of metadatas. ids: Optional list of ids. batch_size: Size of document chunks for each individual insertion API request. If not provided, defaults to the vector-store overall defaults (which in turn falls to astrapy defaults). batch_concurrency: number of threads to process insertion batches concurrently. Defaults to the vector-store overall setting if not provided. overwrite_concurrency: number of threads to process pre-existing documents in each batch. Defaults to the vector-store overall setting if not provided. **kwargs: Additional arguments are ignored. Note: There are constraints on the allowed field names in the metadata dictionaries, coming from the underlying Astra DB API. For instance, the ``$`` (dollar sign) cannot be used in the dict keys. See this document for details: https://docs.datastax.com/en/astra-db-serverless/api-reference/overview.html#limits Returns: The list of ids of the added texts. """ifkwargs:warnings.warn("Method 'add_texts' of AstraDBVectorStore vector store invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)self.astra_env.ensure_db_setup()embedding_vectors:Sequence[list[float]|None]ifself.document_codec.server_side_embeddings:embedding_vectors=[Nonefor_inlist(texts)]else:embedding_vectors=self._get_safe_embedding().embed_documents(list(texts))documents_to_insert=self._get_documents_to_insert(texts,embedding_vectors,metadatas,ids)# perform an AstraPy insert_many, catching exceptions for overwriting docsids_to_replace:list[str]inserted_ids:list[str]=[]try:insert_many_result=self.astra_env.collection.insert_many(documents_to_insert,ordered=False,concurrency=batch_concurrencyorself.bulk_insert_batch_concurrency,chunk_size=batch_sizeorself.batch_size,)ids_to_replace=[]inserted_ids=insert_many_result.inserted_idsexceptInsertManyExceptionaserr:# check that the error is solely due to already-existing documentserror_codes={err_desc.error_codeforerr_descinerr.error_descriptors}iferror_codes=={DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE}:inserted_ids=err.partial_result.inserted_idsinserted_ids_set=set(inserted_ids)ids_to_replace=[doc_idfordocumentindocuments_to_insertif(doc_id:=self.document_codec.get_id(document))notininserted_ids_set]else:full_err_message=_insertmany_error_message(err)raiseAstraDBVectorStoreError(full_err_message)fromerr# if necessary, replace docs for the non-inserted idsifids_to_replace:documents_to_replace=[documentfordocumentindocuments_to_insertifself.document_codec.get_id(document)inids_to_replace]_max_workers=(overwrite_concurrencyorself.bulk_insert_overwrite_concurrency)withThreadPoolExecutor(max_workers=_max_workers,)asexecutor:def_replace_document(document:DocDict,)->tuple[UpdateResult,str]:doc_id=self.document_codec.get_id(document)returnself.astra_env.collection.replace_one(self.document_codec.encode_query(ids=[doc_id]),document,),doc_idreplace_results=list(executor.map(_replace_document,documents_to_replace,))replaced_count=sum(r_res.update_info["n"]forr_res,_inreplace_results)inserted_ids+=[replaced_idfor_,replaced_idinreplace_results]ifreplaced_count!=len(ids_to_replace):missing=len(ids_to_replace)-replaced_countmsg=("AstraDBVectorStore.add_texts could not insert all requested "f"documents ({missing} failed replace_one calls)")raiseAstraDBVectorStoreError(msg)returninserted_ids
[docs]@overrideasyncdefaadd_texts(self,texts:Iterable[str],metadatas:list[dict]|None=None,ids:list[str]|None=None,*,batch_size:int|None=None,batch_concurrency:int|None=None,overwrite_concurrency:int|None=None,**kwargs:Any,)->list[str]:"""Run texts through the embeddings and add them to the vectorstore. If passing explicit ids, those entries whose id is in the store already will be replaced. Args: texts: Texts to add to the vectorstore. metadatas: Optional list of metadatas. ids: Optional list of ids. batch_size: Size of document chunks for each individual insertion API request. If not provided, defaults to the vector-store overall defaults (which in turn falls to astrapy defaults). batch_concurrency: number of simultaneous coroutines to process insertion batches concurrently. Defaults to the vector-store overall setting if not provided. overwrite_concurrency: number of simultaneous coroutines to process pre-existing documents in each batch. Defaults to the vector-store overall setting if not provided. **kwargs: Additional arguments are ignored. Note: There are constraints on the allowed field names in the metadata dictionaries, coming from the underlying Astra DB API. For instance, the ``$`` (dollar sign) cannot be used in the dict keys. See this document for details: https://docs.datastax.com/en/astra-db-serverless/api-reference/overview.html#limits Returns: The list of ids of the added texts. """ifkwargs:warnings.warn("Method 'aadd_texts' of AstraDBVectorStore invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)awaitself.astra_env.aensure_db_setup()embedding_vectors:Sequence[list[float]|None]ifself.document_codec.server_side_embeddings:embedding_vectors=[Nonefor_inlist(texts)]else:embedding_vectors=awaitself._get_safe_embedding().aembed_documents(list(texts))documents_to_insert=self._get_documents_to_insert(texts,embedding_vectors,metadatas,ids)# perform an AstraPy insert_many, catching exceptions for overwriting docsids_to_replace:list[str]inserted_ids:list[str]=[]try:insert_many_result=awaitself.astra_env.async_collection.insert_many(documents_to_insert,ordered=False,concurrency=batch_concurrencyorself.bulk_insert_batch_concurrency,chunk_size=batch_sizeorself.batch_size,)ids_to_replace=[]inserted_ids=insert_many_result.inserted_idsexceptInsertManyExceptionaserr:# check that the error is solely due to already-existing documentserror_codes={err_desc.error_codeforerr_descinerr.error_descriptors}iferror_codes=={DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE}:inserted_ids=err.partial_result.inserted_idsinserted_ids_set=set(inserted_ids)ids_to_replace=[doc_idfordocumentindocuments_to_insertif(doc_id:=self.document_codec.get_id(document))notininserted_ids_set]else:full_err_message=_insertmany_error_message(err)raiseAstraDBVectorStoreError(full_err_message)fromerr# if necessary, replace docs for the non-inserted idsifids_to_replace:documents_to_replace=[documentfordocumentindocuments_to_insertifself.document_codec.get_id(document)inids_to_replace]sem=asyncio.Semaphore(overwrite_concurrencyorself.bulk_insert_overwrite_concurrency,)_async_collection=self.astra_env.async_collectionasyncdef_replace_document(document:DocDict,)->tuple[UpdateResult,str]:asyncwithsem:doc_id=self.document_codec.get_id(document)returnawait_async_collection.replace_one(self.document_codec.encode_query(ids=[doc_id]),document,),doc_idtasks=[asyncio.create_task(_replace_document(document))fordocumentindocuments_to_replace]replace_results=awaitasyncio.gather(*tasks,return_exceptions=False)replaced_count=sum(r_res.update_info["n"]forr_res,_inreplace_results)inserted_ids+=[replaced_idfor_,replaced_idinreplace_results]ifreplaced_count!=len(ids_to_replace):missing=len(ids_to_replace)-replaced_countmsg=("AstraDBVectorStore.add_texts could not insert all requested "f"documents ({missing} failed replace_one calls)")raiseAstraDBVectorStoreError(msg)returninserted_ids
[docs]defupdate_metadata(self,id_to_metadata:dict[str,dict],*,overwrite_concurrency:int|None=None,)->int:"""Add/overwrite the metadata of existing documents. For each document to update, the new metadata dictionary is appended to the existing metadata, overwriting individual keys that existed already. Args: id_to_metadata: map from the Document IDs to modify to the new metadata for updating. Keys in this dictionary that do not correspond to an existing document will be silently ignored. The values of this map are metadata dictionaries for updating the documents. Any pre-existing metadata will be merged with these entries, which take precedence on a key-by-key basis. overwrite_concurrency: number of threads to process the updates. Defaults to the vector-store overall setting if not provided. Returns: the number of documents successfully updated (i.e. found to exist, since even an update with `{}` as the new metadata counts as successful.) """self.astra_env.ensure_db_setup()_max_workers=overwrite_concurrencyorself.bulk_insert_overwrite_concurrencywithThreadPoolExecutor(max_workers=_max_workers,)asexecutor:def_update_document(id_md_pair:tuple[str,dict],)->UpdateResult:document_id,update_metadata=id_md_pairencoded_metadata=self.filter_to_query(update_metadata)returnself.astra_env.collection.update_one(self.document_codec.encode_query(ids=[document_id]),{"$set":encoded_metadata},)update_results=list(executor.map(_update_document,id_to_metadata.items(),))returnsum(u_res.update_info["n"]foru_resinupdate_results)
[docs]asyncdefaupdate_metadata(self,id_to_metadata:dict[str,dict],*,overwrite_concurrency:int|None=None,)->int:"""Add/overwrite the metadata of existing documents. For each document to update, the new metadata dictionary is appended to the existing metadata, overwriting individual keys that existed already. Args: id_to_metadata: map from the Document IDs to modify to the new metadata for updating. Keys in this dictionary that do not correspond to an existing document will be silently ignored. The values of this map are metadata dictionaries for updating the documents. Any pre-existing metadata will be merged with these entries, which take precedence on a key-by-key basis. overwrite_concurrency: number of asynchronous tasks to process the updates. Defaults to the vector-store overall setting if not provided. Returns: the number of documents successfully updated (i.e. found to exist, since even an update with `{}` as the new metadata counts as successful.) """awaitself.astra_env.aensure_db_setup()sem=asyncio.Semaphore(overwrite_concurrencyorself.bulk_insert_overwrite_concurrency,)_async_collection=self.astra_env.async_collectionasyncdef_update_document(id_md_pair:tuple[str,dict],)->UpdateResult:document_id,update_metadata=id_md_pairencoded_metadata=self.filter_to_query(update_metadata)asyncwithsem:returnawait_async_collection.update_one(self.document_codec.encode_query(ids=[document_id]),{"$set":encoded_metadata},)tasks=[asyncio.create_task(_update_document(id_md_pair))forid_md_pairinid_to_metadata.items()]update_results=awaitasyncio.gather(*tasks,return_exceptions=False)returnsum(u_res.update_info["n"]foru_resinupdate_results)
[docs]deffull_decode_astra_db_document(self,astra_db_document:DocDict,)->AstraDBQueryResult|None:"""Decode an Astra DB document in full, i.e. into Document+embedding/similarity. This operation returns a representation that is independent of the codec being used in the collection (whereas the input, a 'raw' Astra DB document, is codec-dependent). The input raw document can carry information on embedding and similarity, depending on details of the query used to retrieve it. These can be set to None in the resulf if not found. The whole method can return a None, to signal that the codec has refused the conversion (e.g. because the input document is deemed faulty). Args: astra_db_document: a dictionary obtained through `run_query_raw` from the collection. Returns: a AstraDBQueryResult named tuple with Document, id, embedding (where applicable) and similarity (where applicable), or an overall None if the decoding is refused by the codec. """decoded=self.document_codec.decode(astra_db_document)ifdecodedisnotNone:doc_id=self.document_codec.get_id(astra_db_document)doc_embedding=self.document_codec.decode_vector(astra_db_document)doc_similarity=self.document_codec.get_similarity(astra_db_document)returnAstraDBQueryResult(document=decoded,id=doc_id,embedding=doc_embedding,similarity=doc_similarity,)returnNone
[docs]defrun_query_raw(self,*,n:int,ids:list[str]|None=None,filter:dict[str,Any]|None=None,# noqa: A002sort:dict[str,Any]|None=None,include_similarity:bool|None=None,include_sort_vector:bool=False,include_embeddings:bool=False,)->tuple[list[float]|None,Iterable[DocDict]]|Iterable[DocDict]:"""Execute a generic query on stored documents, returning Astra DB documents. The return value has a variable format, depending on whether the 'sort vector' is requested back from the server. Only the `n` parameter is required. Omitting all other parameters results in a query that matches each and every document found on the collection. The method does not expose a projection directly, which is instead automatically determined based on the invocation options. The returned documents are exactly as they come back from Astra DB (taking into account the projection as well). A further step, namely subsequent invocation of the `convert_astra_db_document` method, is required to reconstruct codec-independent Document objects. The reason for keeping the retrieval and the decoding steps separate is that a caller may want to first deduplicate/discard items, in order to convert only the items actually needed. Args: n: amount of items to return. Fewer items than `n` may be returned if the collection has not enough matches. ids: a list of document IDs to restrict the query to. If this is supplied, only document with an ID among the provided one will match. If further query filters are provided (i.e. metadata), matches must satisfy both requirements. filter: a metadata filtering part. If provided, it must refer to metadata keys by their bare name (such as `{"key": 123}`). This filter can combine nested conditions with "$or"/"$and" connectors, for example: - `{"tag": "a"}` - `{"$or": [{"tag": "a"}, "label": "b"]}` - `{"$and": [{"tag": {"$in": ["a", "z"]}}, "label": "b"]}` sort: a 'sort' clause for the query, such as `{"$vector": [...]}`, `{"$vectorize": "..."}` or `{"mdkey": 1}`. Metadata sort conditions must be expressed by their 'bare' name. include_similarity: whether to return similarity scores with each match. Requires vector sort. include_sort_vector: whether to return the very query vector used for the ANN search alongside the iterable of results. Requires vector sort. Note that the shape of the return value depends on this parameter. include_embeddings: whether to retrieve the matches' own embedding vectors. Returns: The shape of the return value depends on the value of `include_sort_vector`: * if `include_sort_vector = False`, the return value is an iterable over Astra DB documents (dictionaries); * if `include_sort_vector = True`, the return value is a 2-item tuple `(sort_v, astra_db_ite)` tuple, where: - `sort_v` is the sort vector, if requested, or None if not available. - `astra_db_ite` is an iterable over Astra DB documents (dictionaries). """self.astra_env.ensure_db_setup()find_query=self.document_codec.encode_query(ids=ids,filter_dict=filter,)find_sort=self.document_codec.encode_filter(sortor{})find_projection=(self.document_codec.full_projectionifinclude_embeddingselseself.document_codec.base_projection)find_raw_iterator=self.astra_env.collection.find(filter=find_query,projection=find_projection,limit=n,include_similarity=include_similarity,include_sort_vector=include_sort_vector,sort=find_sort,)# stripping down the Astra DB cursor details into a plain iterator:final_doc_iterator=(docfordocinfind_raw_iterator)ifinclude_sort_vector:sort_vector=(find_raw_iterator.get_sort_vector()ifinclude_sort_vectorelseNone)returnsort_vector,final_doc_iteratorreturnfinal_doc_iterator
[docs]defrun_query(self,*,n:int,ids:list[str]|None=None,filter:dict[str,Any]|None=None,# noqa: A002sort:dict[str,Any]|None=None,include_similarity:bool|None=None,include_sort_vector:bool=False,include_embeddings:bool=False,)->(tuple[list[float]|None,Iterable[AstraDBQueryResult]]|Iterable[AstraDBQueryResult]):"""Execute a generic query on stored documents, returning Documents+other info. The return value has a variable format, depending on whether the 'sort vector' is requested back from the server. Only the `n` parameter is required. Omitting all other parameters results in a query that matches each and every document found on the collection. The method does not expose a projection directly, which is instead automatically determined based on the invocation options. The returned Document objects are codec-independent. Args: n: amount of items to return. Fewer items than `n` may be returned in the following cases: (a) the decoding skips some raw entries from the server; (b) the collection has not enough matches. ids: a list of document IDs to restrict the query to. If this is supplied, only document with an ID among the provided one will match. If further query filters are provided (i.e. metadata), matches must satisfy both requirements. filter: a metadata filtering part. If provided, it must refer to metadata keys by their bare name (such as `{"key": 123}`). This filter can combine nested conditions with "$or"/"$and" connectors, for example: - `{"tag": "a"}` - `{"$or": [{"tag": "a"}, "label": "b"]}` - `{"$and": [{"tag": {"$in": ["a", "z"]}}, "label": "b"]}` sort: a 'sort' clause for the query, such as `{"$vector": [...]}`, `{"$vectorize": "..."}` or `{"mdkey": 1}`. Metadata sort conditions must be expressed by their 'bare' name. include_similarity: whether to return similarity scores with each match. Requires vector sort. include_sort_vector: whether to return the very query vector used for the ANN search alongside the iterable of results. Requires vector sort. Note that the shape of the return value depends on this parameter. include_embeddings: whether to retrieve the matches' own embedding vectors. Returns: The shape of the return value depends on the value of `include_sort_vector`: * if `include_sort_vector = False`, the return value is an iterable over the AstraDBQueryResult items returned by the query. Entries that fail the decoding step, if any, are discarded after the query, which may lead to fewer items being returned than the required `n`. * if `include_sort_vector = True`, the return value is a 2-item tuple `(sort_v, results_ite)` tuple, where: - `sort_v` is the sort vector, if requested, or None if not available. - `results_ite` is an iterable over AstraDBQueryResult items as above. """ifinclude_sort_vector:query_v,astra_docs_ite=self.run_query_raw(n=n,ids=ids,filter=filter,sort=sort,include_similarity=include_similarity,include_sort_vector=True,include_embeddings=include_embeddings,)return(query_v,(decoded_tupleforastra_db_docinastra_docs_iteif(decoded_tuple:=self.full_decode_astra_db_document(astra_db_doc))isnotNone),)astra_docs_ite=self.run_query_raw(n=n,ids=ids,filter=filter,sort=sort,include_similarity=include_similarity,include_sort_vector=False,include_embeddings=include_embeddings,)return(decoded_tupleforastra_db_docinastra_docs_iteif(decoded_tuple:=self.full_decode_astra_db_document(astra_db_doc))isnotNone)
[docs]asyncdefarun_query_raw(self,*,n:int,ids:list[str]|None=None,filter:dict[str,Any]|None=None,# noqa: A002sort:dict[str,Any]|None=None,include_similarity:bool|None=None,include_sort_vector:bool=False,include_embeddings:bool=False,)->tuple[list[float]|None,AsyncIterable[DocDict]]|AsyncIterable[DocDict]:"""Execute a generic query on stored documents, returning Astra DB documents. The return value has a variable format, depending on whether the 'sort vector' is requested back from the server. Only the `n` parameter is required. Omitting all other parameters results in a query that matches each and every document found on the collection. The method does not expose a projection directly, which is instead automatically determined based on the invocation options. The returned documents are exactly as they come back from Astra DB (taking into account the projection as well). A further step, namely subsequent invocation of the `convert_astra_db_document` method, is required to reconstruct codec-independent Document objects. The reason for keeping the retrieval and the decoding steps separate is that a caller may want to first deduplicate/discard items, in order to convert only the items actually needed. Args: n: amount of items to return. Fewer items than `n` may be returned in the following cases: (a) the decoding skips some raw entries from the server; (b) the collection has not enough matches. ids: a list of document IDs to restrict the query to. If this is supplied, only document with an ID among the provided one will match. If further query filters are provided (i.e. metadata), matches must satisfy both requirements. filter: a metadata filtering part. If provided, it must refer to metadata keys by their bare name (such as `{"key": 123}`). This filter can combine nested conditions with "$or"/"$and" connectors, for example: - `{"tag": "a"}` - `{"$or": [{"tag": "a"}, "label": "b"]}` - `{"$and": [{"tag": {"$in": ["a", "z"]}}, "label": "b"]}` sort: a 'sort' clause for the query, such as `{"$vector": [...]}`, `{"$vectorize": "..."}` or `{"mdkey": 1}`. Metadata sort conditions must be expressed by their 'bare' name. include_similarity: whether to return similarity scores with each match. Requires vector sort. include_sort_vector: whether to return the very query vector used for the ANN search alongside the iterable of results. Requires vector sort. Note that the shape of the return value depends on this parameter. include_embeddings: whether to retrieve the matches' own embedding vectors. Returns: The shape of the return value depends on the value of `include_sort_vector`: * if `include_sort_vector = False`, the return value is an iterable over Astra DB documents (dictionaries); * if `include_sort_vector = True`, the return value is a 2-item tuple `(sort_v, astra_db_ite)` tuple, where: - `sort_v` is the sort vector, if requested, or None if not available. - `astra_db_ite` is an iterable over Astra DB documents (dictionaries). """awaitself.astra_env.aensure_db_setup()find_query=self.document_codec.encode_query(ids=ids,filter_dict=filter,)find_sort=self.document_codec.encode_filter(sortor{})find_projection=(self.document_codec.full_projectionifinclude_embeddingselseself.document_codec.base_projection)find_raw_iterator=self.astra_env.async_collection.find(filter=find_query,projection=find_projection,limit=n,include_similarity=include_similarity,include_sort_vector=include_sort_vector,sort=find_sort,)# stripping down the Astra DB cursor details into a plain iterator:final_doc_iterator=(docasyncfordocinfind_raw_iterator)ifinclude_sort_vector:sort_vector=(awaitfind_raw_iterator.get_sort_vector()ifinclude_sort_vectorelseNone)returnsort_vector,final_doc_iteratorreturnfinal_doc_iterator
[docs]asyncdefarun_query(self,*,n:int,ids:list[str]|None=None,filter:dict[str,Any]|None=None,# noqa: A002sort:dict[str,Any]|None=None,include_similarity:bool|None=None,include_sort_vector:bool=False,include_embeddings:bool=False,)->(tuple[list[float]|None,AsyncIterable[AstraDBQueryResult]]|AsyncIterable[AstraDBQueryResult]):"""Execute a generic query on stored documents, returning Documents+other info. The return value has a variable format, depending on whether the 'sort vector' is requested back from the server. Only the `n` parameter is required. Omitting all other parameters results in a query that matches each and every document found on the collection. The method does not expose a projection directly, which is instead automatically determined based on the invocation options. The returned Document objects are codec-independent. Args: n: amount of items to return. Fewer items than `n` may be returned if the collection has not enough matches. ids: a list of document IDs to restrict the query to. If this is supplied, only document with an ID among the provided one will match. If further query filters are provided (i.e. metadata), matches must satisfy both requirements. filter: a metadata filtering part. If provided, it must refer to metadata keys by their bare name (such as `{"key": 123}`). This filter can combine nested conditions with "$or"/"$and" connectors, for example: - `{"tag": "a"}` - `{"$or": [{"tag": "a"}, "label": "b"]}` - `{"$and": [{"tag": {"$in": ["a", "z"]}}, "label": "b"]}` sort: a 'sort' clause for the query, such as `{"$vector": [...]}`, `{"$vectorize": "..."}` or `{"mdkey": 1}`. Metadata sort conditions must be expressed by their 'bare' name. include_similarity: whether to return similarity scores with each match. Requires vector sort. include_sort_vector: whether to return the very query vector used for the ANN search alongside the iterable of results. Requires vector sort. Note that the shape of the return value depends on this parameter. include_embeddings: whether to retrieve the matches' own embedding vectors. Returns: The shape of the return value depends on the value of `include_sort_vector`: * if `include_sort_vector = False`, the return value is an iterable over the AstraDBQueryResult items returned by the query. Entries that fail the decoding step, if any, are discarded after the query, which may lead to fewer items being returned than the required `n`. * if `include_sort_vector = True`, the return value is a 2-item tuple `(sort_v, results_ite)` tuple, where: - `sort_v` is the sort vector, if requested, or None if not available. - `results_ite` is an iterable over AstraDBQueryResult items as above. """ifinclude_sort_vector:query_v,astra_docs_ite=awaitself.arun_query_raw(n=n,ids=ids,filter=filter,sort=sort,include_similarity=include_similarity,include_sort_vector=True,include_embeddings=include_embeddings,)return(query_v,(decoded_tupleasyncforastra_db_docinastra_docs_iteif(decoded_tuple:=self.full_decode_astra_db_document(astra_db_doc))isnotNone),)astra_docs_ite=awaitself.arun_query_raw(n=n,ids=ids,filter=filter,sort=sort,include_similarity=include_similarity,include_sort_vector=False,include_embeddings=include_embeddings,)return(decoded_tupleasyncforastra_db_docinastra_docs_iteif(decoded_tuple:=self.full_decode_astra_db_document(astra_db_doc))isnotNone)
[docs]defmetadata_search(self,filter:dict[str,Any]|None=None,# noqa: A002n:int=5,)->list[Document]:"""Get documents via a metadata search. Args: filter: the metadata to query for. n: the maximum number of documents to return. """docs_ite=self.run_query(n=n,filter=filter)return[docfordoc,_,_,_indocs_ite]
[docs]asyncdefametadata_search(self,filter:dict[str,Any]|None=None,# noqa: A002n:int=5,)->Iterable[Document]:"""Get documents via a metadata search. Args: filter: the metadata to query for. n: the maximum number of documents to return. """docs_ite=awaitself.arun_query(n=n,filter=filter)return[docasyncfordoc,_,_,_indocs_ite]
[docs]defget_by_document_id(self,document_id:str)->Document|None:"""Retrieve a single document from the store, given its document ID. Args: document_id: The document ID Returns: The the document if it exists. Otherwise None. """hits_ite=self.run_query(n=1,ids=[document_id],)hits=[docfordoc,_,_,_inhits_ite]ifhits:returnhits[0]returnNone
[docs]asyncdefaget_by_document_id(self,document_id:str)->Document|None:"""Retrieve a single document from the store, given its document ID. Args: document_id: The document ID Returns: The the document if it exists. Otherwise None. """hits_ite=awaitself.arun_query(n=1,ids=[document_id],)hits=[docasyncfordoc,_,_,_inhits_ite]ifhits:returnhits[0]returnNone
[docs]@overridedefsimilarity_search(self,query:str,k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to query. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query. """return[docfor(doc,_,_)inself.similarity_search_with_score_id(query=query,k=k,filter=filter,)]
[docs]@overridedefsimilarity_search_with_score(self,query:str,k:int=4,filter:dict[str,Any]|None=None,)->list[tuple[Document,float]]:"""Return docs most similar to query with score. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,score)for(doc,score,_)inself.similarity_search_with_score_id(query=query,k=k,filter=filter,)]
[docs]defsimilarity_search_with_score_id(self,query:str,k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Return docs most similar to the query with score and id. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score, id), the most similar to the query. """ifself.document_codec.server_side_embeddings:sort=self.document_codec.encode_vectorize_sort(query)returnself._similarity_search_with_score_id_by_sort(sort=sort,k=k,filter=filter,)embedding_vector=self._get_safe_embedding().embed_query(query)returnself.similarity_search_with_score_id_by_vector(embedding=embedding_vector,k=k,filter=filter,)
[docs]@overridedefsimilarity_search_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query vector. """return[docfor(doc,_,_)inself.similarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]defsimilarity_search_with_score_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float]]:"""Return docs most similar to embedding vector with score. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,score)for(doc,score,_)inself.similarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]defsimilarity_search_with_score_id_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Return docs most similar to embedding vector with score and id. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score, id), the most similar to the query vector. """ifself.document_codec.server_side_embeddings:msg=("Searching by vector on a Vector Store that uses server-side ""embeddings is not allowed.")raiseValueError(msg)sort=self.document_codec.encode_vector_sort(embedding)returnself._similarity_search_with_score_id_by_sort(sort=sort,k=k,filter=filter,)
def_similarity_search_with_score_id_by_sort(self,sort:dict[str,Any],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Run ANN search with a provided sort clause."""hits_ite=self.run_query(n=k,filter=filter,sort=sort,include_similarity=True,)# doc is a Document and sim is a float:return[cast(tuple[Document,float,str],(doc,sim,did))fordoc,did,_,siminhits_ite]
[docs]@overrideasyncdefasimilarity_search(self,query:str,k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to query. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query. """return[docfor(doc,_,_)inawaitself.asimilarity_search_with_score_id(query=query,k=k,filter=filter,)]
[docs]@overrideasyncdefasimilarity_search_with_score(self,query:str,k:int=4,filter:dict[str,Any]|None=None,)->list[tuple[Document,float]]:"""Return docs most similar to query with score. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,score)for(doc,score,_)inawaitself.asimilarity_search_with_score_id(query=query,k=k,filter=filter,)]
[docs]asyncdefasimilarity_search_with_score_id(self,query:str,k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Return docs most similar to the query with score and id. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score, id), the most similar to the query. """ifself.document_codec.server_side_embeddings:sort=self.document_codec.encode_vectorize_sort(query)returnawaitself._asimilarity_search_with_score_id_by_sort(sort=sort,k=k,filter=filter,)embedding_vector=awaitself._get_safe_embedding().aembed_query(query)returnawaitself.asimilarity_search_with_score_id_by_vector(embedding=embedding_vector,k=k,filter=filter,)
[docs]@overrideasyncdefasimilarity_search_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query vector. """return[docfor(doc,_,_)inawaitself.asimilarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]asyncdefasimilarity_search_with_score_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float]]:"""Return docs most similar to embedding vector with score. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,scr)for(doc,scr,_)inawaitself.asimilarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]asyncdefasimilarity_search_with_score_id_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Return docs most similar to embedding vector with score and id. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score, id), the most similar to the query vector. """ifself.document_codec.server_side_embeddings:msg=("Searching by vector on a Vector Store that uses server-side ""embeddings is not allowed.")raiseValueError(msg)sort=self.document_codec.encode_vector_sort(embedding)returnawaitself._asimilarity_search_with_score_id_by_sort(sort=sort,k=k,filter=filter,)
[docs]defsimilarity_search_with_embedding_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,list[float]]]:"""Return docs most similar to embedding vector with embedding. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: (The query embedding vector, The list of (Document, embedding), the most similar to the query vector.). """sort=self.document_codec.encode_vector_sort(vector=embedding)_,doc_emb_list=self._similarity_search_with_embedding_by_sort(sort=sort,k=k,filter=filter)returndoc_emb_list
[docs]asyncdefasimilarity_search_with_embedding_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,list[float]]]:"""Return docs most similar to embedding vector with embedding. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: (The query embedding vector, The list of (Document, embedding), the most similar to the query vector.). """sort=self.document_codec.encode_vector_sort(vector=embedding)_,doc_emb_list=awaitself._asimilarity_search_with_embedding_by_sort(sort=sort,k=k,filter=filter)returndoc_emb_list
[docs]defsimilarity_search_with_embedding(self,query:str,k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->tuple[list[float],list[tuple[Document,list[float]]]]:"""Return docs most similar to the query with embedding. Also includes the query embedding vector. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: (The query embedding vector, The list of (Document, embedding), the most similar to the query vector.). """ifself.document_codec.server_side_embeddings:sort=self.document_codec.encode_vectorize_sort(query)else:query_embedding=self._get_safe_embedding().embed_query(text=query)# shortcut return if query isn't needed.ifk==0:return(query_embedding,[])sort=self.document_codec.encode_vector_sort(vector=query_embedding)returnself._similarity_search_with_embedding_by_sort(sort=sort,k=k,filter=filter)
[docs]asyncdefasimilarity_search_with_embedding(self,query:str,k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->tuple[list[float],list[tuple[Document,list[float]]]]:"""Return docs most similar to the query with embedding. Also includes the query embedding vector. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: (The query embedding vector, The list of (Document, embedding), the most similar to the query vector.). """ifself.document_codec.server_side_embeddings:sort=self.document_codec.encode_vectorize_sort(query)else:query_embedding=self._get_safe_embedding().embed_query(text=query)# shortcut return if query isn't needed.ifk==0:return(query_embedding,[])sort=self.document_codec.encode_vector_sort(vector=query_embedding)returnawaitself._asimilarity_search_with_embedding_by_sort(sort=sort,k=k,filter=filter)
asyncdef_asimilarity_search_with_embedding_by_sort(self,sort:dict[str,Any],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->tuple[list[float],list[tuple[Document,list[float]]]]:"""Run ANN search with a provided sort clause. Returns: (query_embedding, List of (Document, embedding) most similar to the query). """sort_vec,hits_ite=awaitself.arun_query(n=k,filter=filter,sort=sort,include_sort_vector=True,include_embeddings=True,)ifsort_vecisNone:msg="Unable to retrieve the server-side embedding of the query."raiseAstraDBVectorStoreError(msg)# doc is a Document and emb is a list[float]:return(sort_vec,[cast(tuple[Document,list[float]],(doc,emb))asyncfordoc,_,emb,_inhits_ite],)def_similarity_search_with_embedding_by_sort(self,sort:dict[str,Any],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->tuple[list[float],list[tuple[Document,list[float]]]]:"""Run ANN search with a provided sort clause. Returns: (query_embedding, List of (Document, embedding) most similar to the query). """sort_vec,hits_ite=self.run_query(n=k,filter=filter,sort=sort,include_sort_vector=True,include_embeddings=True,)ifsort_vecisNone:msg="Unable to retrieve the server-side embedding of the query."raiseAstraDBVectorStoreError(msg)# doc is a Document and emb is a list[float]:return(sort_vec,[cast(tuple[Document,list[float]],(doc,emb))fordoc,_,emb,_inhits_ite],)asyncdef_asimilarity_search_with_score_id_by_sort(self,sort:dict[str,Any],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Run ANN search with a provided sort clause."""hits_ite=awaitself.arun_query(n=k,filter=filter,sort=sort,include_similarity=True,)# doc is a Document and sim is a float:return[cast(tuple[Document,float,str],(doc,sim,did))asyncfordoc,did,_,siminhits_ite]def_run_mmr_query_by_sort(self,sort:dict[str,Any],k:int,fetch_k:int,lambda_mult:float,filter:dict[str,Any]|None,# noqa: A002)->list[Document]:sort_vec,hits_ite=self.run_query(n=fetch_k,filter=filter,sort=sort,include_sort_vector=True,include_embeddings=True,)# this is list[tuple[Document, list[float]]]:prefetch_hit_pairs=cast(list[tuple[Document,list[float]]],[(doc,emb)fordoc,_,emb,_inhits_ite],)ifsort_vecisNone:msg="Unable to retrieve the server-side embedding of the query."raiseAstraDBVectorStoreError(msg)returnself._get_mmr_hits(embedding=sort_vec,k=k,lambda_mult=lambda_mult,prefetch_hit_pairs=prefetch_hit_pairs,)asyncdef_arun_mmr_query_by_sort(self,sort:dict[str,Any],k:int,fetch_k:int,lambda_mult:float,filter:dict[str,Any]|None,# noqa: A002)->list[Document]:sort_vec,hits_ite=awaitself.arun_query(n=fetch_k,filter=filter,sort=sort,include_sort_vector=True,include_embeddings=True,)# this is list[tuple[Document, list[float]]]:prefetch_hit_pairs=cast(list[tuple[Document,list[float]]],[(doc,emb)asyncfordoc,_,emb,_inhits_ite],)ifsort_vecisNone:msg="Unable to retrieve the server-side embedding of the query."raiseAstraDBVectorStoreError(msg)returnself._get_mmr_hits(embedding=sort_vec,k=k,lambda_mult=lambda_mult,prefetch_hit_pairs=prefetch_hit_pairs,)def_get_mmr_hits(self,embedding:list[float],k:int,lambda_mult:float,prefetch_hit_pairs:list[tuple[Document,list[float]]],)->list[Document]:mmr_chosen_indices=maximal_marginal_relevance(np.array(embedding,dtype=np.float32),[hit_pair[1]forhit_pairinprefetch_hit_pairs],k=k,lambda_mult=lambda_mult,)return[hit_pair[0]forpf_hit_index,hit_pairinenumerate(prefetch_hit_pairs)ifpf_hit_indexinmmr_chosen_indices]
[docs]@overridedefmax_marginal_relevance_search_by_vector(self,embedding:list[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """returnself._run_mmr_query_by_sort(sort=self.document_codec.encode_vector_sort(embedding),k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,)
[docs]@overrideasyncdefamax_marginal_relevance_search_by_vector(self,embedding:list[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """returnawaitself._arun_mmr_query_by_sort(sort=self.document_codec.encode_vector_sort(embedding),k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,)
[docs]@overridedefmax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Query to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """ifself.document_codec.server_side_embeddings:# this case goes directly to the "_by_sort" method# (and does its own filter normalization, as it cannot# use the path for the with-embedding mmr querying)returnself._run_mmr_query_by_sort(sort=self.document_codec.encode_vectorize_sort(query),k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,)embedding_vector=self._get_safe_embedding().embed_query(query)returnself.max_marginal_relevance_search_by_vector(embedding_vector,k,fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)
[docs]@overrideasyncdefamax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Query to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """ifself.document_codec.server_side_embeddings:# this case goes directly to the "_by_sort" method# (and does its own filter normalization, as it cannot# use the path for the with-embedding mmr querying)returnawaitself._arun_mmr_query_by_sort(sort=self.document_codec.encode_vectorize_sort(query),k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,)embedding_vector=awaitself._get_safe_embedding().aembed_query(query)returnawaitself.amax_marginal_relevance_search_by_vector(embedding_vector,k,fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)
@classmethoddef_from_kwargs(cls:type[AstraDBVectorStore],**kwargs:Any,)->AstraDBVectorStore:_args=inspect.getfullargspec(AstraDBVectorStore.__init__).args_kwargs=inspect.getfullargspec(AstraDBVectorStore.__init__).kwonlyargsknown_kwarg_keys=(set(_args)|set(_kwargs))-{"self"}ifkwargs:unknown_kwarg_keys=set(kwargs.keys())-known_kwarg_keysifunknown_kwarg_keys:warnings.warn(("Method 'from_texts/afrom_texts' of AstraDBVectorStore ""vector store invoked with unsupported arguments "f"({', '.join(sorted(unknown_kwarg_keys))}), ""which will be ignored."),UserWarning,stacklevel=3,)known_kwargs={k:vfork,vinkwargs.items()ifkinknown_kwarg_keys}returncls(**known_kwargs)
[docs]@classmethod@overridedeffrom_texts(cls:type[AstraDBVectorStore],texts:list[str],embedding:Embeddings|None=None,metadatas:list[dict]|None=None,ids:list[str]|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from raw texts. Args: texts: the texts to insert. embedding: the embedding function to use in the store. metadatas: metadata dicts for the texts. ids: ids to associate to the texts. **kwargs: you can pass any argument that you would to :meth:`~add_texts` and/or to the ``AstraDBVectorStore`` constructor (see these methods for details). These arguments will be routed to the respective methods as they are. Returns: an ``AstraDBVectorStore`` vectorstore. """_add_texts_inspection=inspect.getfullargspec(AstraDBVectorStore.add_texts)_method_args=(set(_add_texts_inspection.kwonlyargs)|set(_add_texts_inspection.kwonlyargs))-{"self","texts","metadatas","ids"}_init_kwargs={k:vfork,vinkwargs.items()ifknotin_method_args}_method_kwargs={k:vfork,vinkwargs.items()ifkin_method_args}astra_db_store=AstraDBVectorStore._from_kwargs(embedding=embedding,**_init_kwargs,)astra_db_store.add_texts(texts=texts,metadatas=metadatas,ids=ids,**_method_kwargs,)returnastra_db_store
[docs]@override@classmethodasyncdefafrom_texts(cls:type[AstraDBVectorStore],texts:list[str],embedding:Embeddings|None=None,metadatas:list[dict]|None=None,ids:list[str]|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from raw texts. Args: texts: the texts to insert. embedding: embedding function to use. metadatas: metadata dicts for the texts. ids: ids to associate to the texts. **kwargs: you can pass any argument that you would to :meth:`~aadd_texts` and/or to the ``AstraDBVectorStore`` constructor (see these methods for details). These arguments will be routed to the respective methods as they are. Returns: an ``AstraDBVectorStore`` vectorstore. """_aadd_texts_inspection=inspect.getfullargspec(AstraDBVectorStore.aadd_texts)_method_args=(set(_aadd_texts_inspection.kwonlyargs)|set(_aadd_texts_inspection.kwonlyargs))-{"self","texts","metadatas","ids"}_init_kwargs={k:vfork,vinkwargs.items()ifknotin_method_args}_method_kwargs={k:vfork,vinkwargs.items()ifkin_method_args}astra_db_store=AstraDBVectorStore._from_kwargs(embedding=embedding,**_init_kwargs,)awaitastra_db_store.aadd_texts(texts=texts,metadatas=metadatas,ids=ids,**_method_kwargs,)returnastra_db_store
[docs]@classmethod@overridedeffrom_documents(cls:type[AstraDBVectorStore],documents:list[Document],embedding:Embeddings|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from a document list. Utility method that defers to :meth:`from_texts` (see that one). Args: texts: the texts to insert. documents: a list of `Document` objects for insertion in the store. embedding: the embedding function to use in the store. **kwargs: you can pass any argument that you would to :meth:`~add_texts` and/or to the ``AstraDBVectorStore`` constructor (see these methods for details). These arguments will be routed to the respective methods as they are. Returns: an ``AstraDBVectorStore`` vectorstore. """texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]if"ids"inkwargs:warnings.warn(("Parameter `ids` to AstraDBVectorStore's `from_documents` ""method is deprecated. Please set the supplied documents' ""`.id` attribute instead. The id attribute of Document ""is ignored as long as the `ids` parameter is passed."),DeprecationWarning,stacklevel=2,)ids=kwargs.pop("ids")else:_ids=[doc.idfordocindocuments]ids=_idsifany(the_idisnotNoneforthe_idin_ids)elseNonereturncls.from_texts(texts,embedding=embedding,metadatas=metadatas,ids=ids,**kwargs,)
[docs]@classmethodasyncdefafrom_documents(cls:type[AstraDBVectorStore],documents:list[Document],embedding:Embeddings|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from a document list. Utility method that defers to :meth:`afrom_texts` (see that one). Args: see :meth:`afrom_texts`, except here you have to supply ``documents`` in place of ``texts`` and ``metadatas``. Returns: an ``AstraDBVectorStore`` vectorstore. """texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]if"ids"inkwargs:warnings.warn(("Parameter `ids` to AstraDBVectorStore's `from_documents` ""method is deprecated. Please set the supplied documents' ""`.id` attribute instead. The id attribute of Document ""is ignored as long as the `ids` parameter is passed."),DeprecationWarning,stacklevel=2,)ids=kwargs.pop("ids")else:_ids=[doc.idfordocindocuments]ids=_idsifany(the_idisnotNoneforthe_idin_ids)elseNonereturnawaitcls.afrom_texts(texts,embedding=embedding,metadatas=metadatas,ids=ids,**kwargs,)