"""Astra DB vector store integration."""from__future__importannotationsimportasyncioimportinspectimportloggingimportuuidimportwarningsfromconcurrent.futuresimportThreadPoolExecutorfromtypingimport(TYPE_CHECKING,Any,Awaitable,Callable,Dict,Iterable,Sequence,TypeVar,)importnumpyasnpfromastrapy.exceptionsimportInsertManyExceptionfromlangchain_core.runnables.utilsimportgather_with_concurrencyfromlangchain_core.vectorstoresimportVectorStorefromtyping_extensionsimportoverridefromlangchain_astradb.utils.astradbimport(DEFAULT_DOCUMENT_CHUNK_SIZE,MAX_CONCURRENT_DOCUMENT_DELETIONS,MAX_CONCURRENT_DOCUMENT_INSERTIONS,MAX_CONCURRENT_DOCUMENT_REPLACEMENTS,SetupMode,_AstraDBCollectionEnvironment,_survey_collection,)fromlangchain_astradb.utils.mmrimportmaximal_marginal_relevancefromlangchain_astradb.utils.vector_store_autodetectimport(_detect_document_codec,)fromlangchain_astradb.utils.vector_store_codecsimport(_AstraDBVectorStoreDocumentCodec,_DefaultVectorizeVSDocumentCodec,_DefaultVSDocumentCodec,)ifTYPE_CHECKING:fromastrapy.authenticationimportEmbeddingHeadersProvider,TokenProviderfromastrapy.dbimport(AstraDBasAstraDBClient,)fromastrapy.dbimport(AsyncAstraDBasAsyncAstraDBClient,)fromastrapy.infoimportCollectionVectorServiceOptionsfromastrapy.resultsimportUpdateResultfromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsT=TypeVar("T")U=TypeVar("U")DocDict=Dict[str,Any]# dicts expressing entries to insert# indexing options when creating a collectionDEFAULT_INDEXING_OPTIONS={"allow":["metadata"]}logger=logging.getLogger(__name__)def_unique_list(lst:list[T],key:Callable[[T],U])->list[T]:visited_keys:set[U]=set()new_lst=[]foriteminlst:item_key=key(item)ifitem_keynotinvisited_keys:visited_keys.add(item_key)new_lst.append(item)returnnew_lstdef_normalize_content_field(content_field:str|None,*,is_autodetect:bool,has_vectorize:bool,)->str:ifhas_vectorize:ifcontent_fieldisnotNone:msg="content_field is not configurable for vectorize collections."raiseValueError(msg)return"$vectorize"ifcontent_fieldisNone:return"*"ifis_autodetectelse"content"ifcontent_field=="*":ifnotis_autodetect:msg="content_field='*' illegal if autodetect_collection is False."raiseValueError(msg)returncontent_fieldreturncontent_fielddef_validate_autodetect_init_params(*,metric:str|None=None,setup_mode:SetupMode|None,pre_delete_collection:bool,metadata_indexing_include:Iterable[str]|None,metadata_indexing_exclude:Iterable[str]|None,collection_indexing_policy:dict[str,Any]|None,collection_vector_service_options:CollectionVectorServiceOptions|None,)->None:"""Check that the passed parameters do not violate the autodetect constraints."""forbidden_parameters=[p_nameforp_name,p_valuein(("metric",metric),("metadata_indexing_include",metadata_indexing_include),("metadata_indexing_exclude",metadata_indexing_exclude),("collection_indexing_policy",collection_indexing_policy),("collection_vector_service_options",collection_vector_service_options),)ifp_valueisnotNone]fp_error:str|None=Noneifforbidden_parameters:fp_error=(f"Parameter(s) {', '.join(forbidden_parameters)}. were provided ""but cannot be passed.")pd_error:str|None=Noneifpre_delete_collection:pd_error="Parameter `pre_delete_collection` cannot be True."sm_error:str|None=Noneifsetup_modeisnotNone:sm_error="Parameter `setup_mode` not allowed."am_errors=[err_sforerr_sin(fp_error,pd_error,sm_error)iferr_sisnotNone]ifam_errors:msg=f"Invalid parameters for autodetect mode: {'; '.join(am_errors)}"raiseValueError(msg)
[docs]classAstraDBVectorStore(VectorStore):"""AstraDB vector store integration. Setup: Install the ``langchain-astradb`` package and head to the `AstraDB website <https://astra.datastax.com>`, create an account, create a new database and `create an application token <https://docs.datastax.com/en/astra-db-serverless/administration/manage-application-tokens.html>`. .. code-block:: bash pip install -qU langchain-astradb Key init args — indexing params: collection_name: str Name of the collection. embedding: Embeddings Embedding function to use. Key init args — client params: api_endpoint: str AstraDB API endpoint. token: str API token for Astra DB usage. namespace: Optional[str] Namespace (aka keyspace) where the collection is created Instantiate: Get your API endpoint and application token from the dashboard of your database. .. code-block:: python import getpass from langchain_astradb import AstraDBVectorStore from langchain_openai import OpenAIEmbeddings ASTRA_DB_API_ENDPOINT = getpass.getpass("ASTRA_DB_API_ENDPOINT = ") ASTRA_DB_APPLICATION_TOKEN = getpass.getpass("ASTRA_DB_APPLICATION_TOKEN = ") vector_store = AstraDBVectorStore( collection_name="astra_vector_langchain", embedding=OpenAIEmbeddings(), api_endpoint=ASTRA_DB_API_ENDPOINT, token=ASTRA_DB_APPLICATION_TOKEN, ) Have the vector store figure out its configuration (documents scheme on DB) from an existing collection, in the case of `server-side-embeddings <https://docs.datastax.com/en/astra-db-serverless/databases/embedding-generation.html>`: .. code-block:: python import getpass from langchain_astradb import AstraDBVectorStore ASTRA_DB_API_ENDPOINT = getpass.getpass("ASTRA_DB_API_ENDPOINT = ") ASTRA_DB_APPLICATION_TOKEN = getpass.getpass("ASTRA_DB_APPLICATION_TOKEN = ") vector_store = AstraDBVectorStore( collection_name="astra_vector_langchain", api_endpoint=ASTRA_DB_API_ENDPOINT, token=ASTRA_DB_APPLICATION_TOKEN, autodetect_collection=True, ) Add Documents: .. code-block:: python from langchain_core.documents import Document document_1 = Document(page_content="foo", metadata={"baz": "bar"}) document_2 = Document(page_content="thud", metadata={"bar": "baz"}) document_3 = Document(page_content="i will be deleted :(") documents = [document_1, document_2, document_3] ids = ["1", "2", "3"] vector_store.add_documents(documents=documents, ids=ids) Delete Documents: .. code-block:: python vector_store.delete(ids=["3"]) Search: .. code-block:: python results = vector_store.similarity_search(query="thud",k=1) for doc in results: print(f"* {doc.page_content} [{doc.metadata}]") .. code-block:: none thud [{'bar': 'baz'}] Search with filter: .. code-block:: python results = vector_store.similarity_search(query="thud",k=1,filter={"bar": "baz"}) for doc in results: print(f"* {doc.page_content} [{doc.metadata}]") .. code-block:: none thud [{'bar': 'baz'}] Search with score: .. code-block:: python results = vector_store.similarity_search_with_score(query="qux",k=1) for doc, score in results: print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") .. code-block:: none [SIM=0.916135] foo [{'baz': 'bar'}] Async: .. code-block:: python # add documents await vector_store.aadd_documents(documents=documents, ids=ids) # delete documents await vector_store.adelete(ids=["3"]) # search results = vector_store.asimilarity_search(query="thud",k=1) # search with score results = await vector_store.asimilarity_search_with_score(query="qux",k=1) for doc,score in results: print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") .. code-block:: none [SIM=0.916135] foo [{'baz': 'bar'}] Use as Retriever: .. code-block:: python retriever = vector_store.as_retriever( search_type="similarity_score_threshold", search_kwargs={"k": 1, "score_threshold": 0.5}, ) retriever.invoke("thud") .. code-block:: none [Document(metadata={'bar': 'baz'}, page_content='thud')] """# noqa: E501def_filter_to_metadata(self,filter_dict:dict[str,Any]|None)->dict[str,Any]:iffilter_dictisNone:return{}returnself.document_codec.encode_filter(filter_dict)@staticmethoddef_normalize_metadata_indexing_policy(metadata_indexing_include:Iterable[str]|None,metadata_indexing_exclude:Iterable[str]|None,collection_indexing_policy:dict[str,Any]|None,)->dict[str,Any]:"""Normalize the constructor indexing parameters. Validate the constructor indexing parameters and normalize them into a ready-to-use dict for the 'options' when creating a collection. """params=[metadata_indexing_include,metadata_indexing_exclude,collection_indexing_policy,]ifparams.count(None)<len(params)-1:msg=("At most one of the parameters `metadata_indexing_include`,"" `metadata_indexing_exclude` and `collection_indexing_policy`"" can be specified as non null.")raiseValueError(msg)ifmetadata_indexing_includeisnotNone:return{"allow":[f"metadata.{md_field}"formd_fieldinmetadata_indexing_include]}ifmetadata_indexing_excludeisnotNone:return{"deny":[f"metadata.{md_field}"formd_fieldinmetadata_indexing_exclude]}return(collection_indexing_policyifcollection_indexing_policyisnotNoneelseDEFAULT_INDEXING_OPTIONS)
[docs]def__init__(self,*,collection_name:str,embedding:Embeddings|None=None,token:str|TokenProvider|None=None,api_endpoint:str|None=None,environment:str|None=None,astra_db_client:AstraDBClient|None=None,async_astra_db_client:AsyncAstraDBClient|None=None,namespace:str|None=None,metric:str|None=None,batch_size:int|None=None,bulk_insert_batch_concurrency:int|None=None,bulk_insert_overwrite_concurrency:int|None=None,bulk_delete_concurrency:int|None=None,setup_mode:SetupMode|None=None,pre_delete_collection:bool=False,metadata_indexing_include:Iterable[str]|None=None,metadata_indexing_exclude:Iterable[str]|None=None,collection_indexing_policy:dict[str,Any]|None=None,collection_vector_service_options:CollectionVectorServiceOptions|None=None,collection_embedding_api_key:str|EmbeddingHeadersProvider|None=None,content_field:str|None=None,ignore_invalid_documents:bool=False,autodetect_collection:bool=False,)->None:"""Wrapper around DataStax Astra DB for vector-store workloads. For quickstart and details, visit https://docs.datastax.com/en/astra-db-serverless/index.html Args: embedding: the embeddings function or service to use. This enables client-side embedding functions or calls to external embedding providers. If ``embedding`` is provided, arguments ``collection_vector_service_options`` and ``collection_embedding_api_key`` cannot be provided. collection_name: name of the Astra DB collection to create/use. token: API token for Astra DB usage, either in the form of a string or a subclass of ``astrapy.authentication.TokenProvider``. If not provided, the environment variable ASTRA_DB_APPLICATION_TOKEN is inspected. api_endpoint: full URL to the API endpoint, such as ``https://<DB-ID>-us-east1.apps.astra.datastax.com``. If not provided, the environment variable ASTRA_DB_API_ENDPOINT is inspected. environment: a string specifying the environment of the target Data API. If omitted, defaults to "prod" (Astra DB production). Other values are in ``astrapy.constants.Environment`` enum class. astra_db_client: *DEPRECATED starting from version 0.3.5.* *Please use 'token', 'api_endpoint' and optionally 'environment'.* you can pass an already-created 'astrapy.db.AstraDB' instance (alternatively to 'token', 'api_endpoint' and 'environment'). async_astra_db_client: *DEPRECATED starting from version 0.3.5.* *Please use 'token', 'api_endpoint' and optionally 'environment'.* you can pass an already-created 'astrapy.db.AsyncAstraDB' instance (alternatively to 'token', 'api_endpoint' and 'environment'). namespace: namespace (aka keyspace) where the collection is created. If not provided, the environment variable ASTRA_DB_KEYSPACE is inspected. Defaults to the database's "default namespace". metric: similarity function to use out of those available in Astra DB. If left out, it will use Astra DB API's defaults (i.e. "cosine" - but, for performance reasons, "dot_product" is suggested if embeddings are normalized to one). batch_size: Size of document chunks for each individual insertion API request. If not provided, astrapy defaults are applied. bulk_insert_batch_concurrency: Number of threads or coroutines to insert batches concurrently. bulk_insert_overwrite_concurrency: Number of threads or coroutines in a batch to insert pre-existing entries. bulk_delete_concurrency: Number of threads or coroutines for multiple-entry deletes. setup_mode: mode used to create the collection (SYNC, ASYNC or OFF). pre_delete_collection: whether to delete the collection before creating it. If False and the collection already exists, the collection will be used as is. metadata_indexing_include: an allowlist of the specific metadata subfields that should be indexed for later filtering in searches. metadata_indexing_exclude: a denylist of the specific metadata subfields that should not be indexed for later filtering in searches. collection_indexing_policy: a full "indexing" specification for what fields should be indexed for later filtering in searches. This dict must conform to to the API specifications (see https://docs.datastax.com/en/astra-db-serverless/api-reference/collections.html#the-indexing-option) collection_vector_service_options: specifies the use of server-side embeddings within Astra DB. If passing this parameter, ``embedding`` cannot be provided. collection_embedding_api_key: for usage of server-side embeddings within Astra DB. With this parameter one can supply an API Key that will be passed to Astra DB with each data request. This parameter can be either a string or a subclass of ``astrapy.authentication.EmbeddingHeadersProvider``. This is useful when the service is configured for the collection, but no corresponding secret is stored within Astra's key management system. This parameter cannot be provided without specifying ``collection_vector_service_options``. content_field: name of the field containing the textual content in the documents when saved on Astra DB. For vectorize collections, this cannot be specified; for non-vectorize collection, defaults to "content". The special value "*" can be passed only if autodetect_collection=True. In this case, the actual name of the key for the textual content is guessed by inspection of a few documents from the collection, under the assumption that the longer strings are the most likely candidates. Please understand the limitations of this method and get some understanding of your data before passing ``"*"`` for this parameter. ignore_invalid_documents: if False (default), exceptions are raised when a document is found on the Astra DB collectin that does not have the expected shape. If set to True, such results from the database are ignored and a warning is issued. Note that in this case a similarity search may end up returning fewer results than the required ``k``. autodetect_collection: if True, turns on autodetect behavior. The store will look for an existing collection of the provided name and infer the store settings from it. Default is False. In autodetect mode, ``content_field`` can be given as ``"*"``, meaning that an attempt will be made to determine it by inspection (unless vectorize is enabled, in which case ``content_field`` is ignored). In autodetect mode, the store not only determines whether embeddings are client- or server-side, but - most importantly - switches automatically between "nested" and "flat" representations of documents on DB (i.e. having the metadata key-value pairs grouped in a ``metadata`` field or spread at the documents' top-level). The former scheme is the native mode of the AstraDBVectorStore; the store resorts to the latter in case of vector collections populated with external means (such as a third-party data import tool) before applying an AstraDBVectorStore to them. Note that the following parameters cannot be used if this is True: ``metric``, ``setup_mode``, ``metadata_indexing_include``, ``metadata_indexing_exclude``, ``collection_indexing_policy``, ``collection_vector_service_options``. Note: For concurrency in synchronous :meth:`~add_texts`:, as a rule of thumb, on a typical client machine it is suggested to keep the quantity bulk_insert_batch_concurrency * bulk_insert_overwrite_concurrency much below 1000 to avoid exhausting the client multithreading/networking resources. The hardcoded defaults are somewhat conservative to meet most machines' specs, but a sensible choice to test may be: - bulk_insert_batch_concurrency = 80 - bulk_insert_overwrite_concurrency = 10 A bit of experimentation is required to nail the best results here, depending on both the machine/network specs and the expected workload (specifically, how often a write is an update of an existing id). Remember you can pass concurrency settings to individual calls to :meth:`~add_texts` and :meth:`~add_documents` as well. """# general collection settingsself.collection_name=collection_nameself.token=tokenself.api_endpoint=api_endpointself.environment=environmentself.namespace=namespaceself.indexing_policy:dict[str,Any]self.autodetect_collection=autodetect_collection# vector-related settingsself.embedding_dimension:int|None=Noneself.embedding=embeddingself.metric=metricself.collection_embedding_api_key=collection_embedding_api_keyself.collection_vector_service_options=collection_vector_service_options# DB-encoding settings:self.document_codec:_AstraDBVectorStoreDocumentCodec# concurrency settingsself.batch_size:int|None=batch_sizeorDEFAULT_DOCUMENT_CHUNK_SIZEself.bulk_insert_batch_concurrency:int=(bulk_insert_batch_concurrencyorMAX_CONCURRENT_DOCUMENT_INSERTIONS)self.bulk_insert_overwrite_concurrency:int=(bulk_insert_overwrite_concurrencyorMAX_CONCURRENT_DOCUMENT_REPLACEMENTS)self.bulk_delete_concurrency:int=(bulk_delete_concurrencyorMAX_CONCURRENT_DOCUMENT_DELETIONS)_setup_mode:SetupMode_embedding_dimension:int|Awaitable[int]|Noneifnotself.autodetect_collection:logger.info("vector store default init, collection '%s'",self.collection_name)_setup_mode=SetupMode.SYNCifsetup_modeisNoneelsesetup_mode_embedding_dimension=self._prepare_embedding_dimension(_setup_mode)# determine vectorize/nonvectorizehas_vectorize=self.collection_vector_service_optionsisnotNone_content_field=_normalize_content_field(content_field,is_autodetect=False,has_vectorize=has_vectorize,)ifself.collection_vector_service_optionsisnotNone:self.document_codec=_DefaultVectorizeVSDocumentCodec(ignore_invalid_documents=ignore_invalid_documents,)else:self.document_codec=_DefaultVSDocumentCodec(content_field=_content_field,ignore_invalid_documents=ignore_invalid_documents,)# indexing policy settingself.indexing_policy=self._normalize_metadata_indexing_policy(metadata_indexing_include=metadata_indexing_include,metadata_indexing_exclude=metadata_indexing_exclude,collection_indexing_policy=collection_indexing_policy,)else:logger.info("vector store autodetect init, collection '%s'",self.collection_name)# specific checks for autodetect logic_validate_autodetect_init_params(metric=self.metric,setup_mode=setup_mode,pre_delete_collection=pre_delete_collection,metadata_indexing_include=metadata_indexing_include,metadata_indexing_exclude=metadata_indexing_exclude,collection_indexing_policy=collection_indexing_policy,collection_vector_service_options=self.collection_vector_service_options,)_setup_mode=SetupMode.OFF# fetch collection intelligencec_descriptor,c_documents=_survey_collection(collection_name=self.collection_name,token=self.token,api_endpoint=self.api_endpoint,environment=self.environment,astra_db_client=astra_db_client,async_astra_db_client=async_astra_db_client,namespace=self.namespace,)ifc_descriptorisNone:msg=f"Collection '{self.collection_name}' not found."raiseValueError(msg)# use the collection info to set the store propertiesself.indexing_policy=self._normalize_metadata_indexing_policy(metadata_indexing_include=None,metadata_indexing_exclude=None,collection_indexing_policy=c_descriptor.options.indexing,)ifc_descriptor.options.vectorisNone:msg="Non-vector collection detected."raiseValueError(msg)_embedding_dimension=c_descriptor.options.vector.dimensionself.collection_vector_service_options=c_descriptor.options.vector.servicehas_vectorize=self.collection_vector_service_optionsisnotNonelogger.info("vector store autodetect: has_vectorize = %s",has_vectorize)norm_content_field=_normalize_content_field(content_field,is_autodetect=True,has_vectorize=has_vectorize,)self.document_codec=_detect_document_codec(c_documents,has_vectorize=has_vectorize,ignore_invalid_documents=ignore_invalid_documents,norm_content_field=norm_content_field,)# validate embedding/vectorize compatibility and such.# Embedding and the server-side embeddings are mutually exclusive,# as both specify how to produce embeddings.# Also API key makes no sense unless vectorize.ifself.embeddingisNoneandnotself.document_codec.server_side_embeddings:msg="Embedding is required for non-vectorize collections."raiseValueError(msg)ifself.embeddingisnotNoneandself.document_codec.server_side_embeddings:msg="Embedding cannot be provided for vectorize collections."raiseValueError(msg)if(notself.document_codec.server_side_embeddingsandself.collection_embedding_api_keyisnotNone):msg="Embedding API Key cannot be provided for non-vectorize collections."raiseValueError(msg)self.astra_env=_AstraDBCollectionEnvironment(collection_name=collection_name,token=self.token,api_endpoint=self.api_endpoint,environment=self.environment,astra_db_client=astra_db_client,async_astra_db_client=async_astra_db_client,namespace=self.namespace,setup_mode=_setup_mode,pre_delete_collection=pre_delete_collection,embedding_dimension=_embedding_dimension,metric=self.metric,requested_indexing_policy=self.indexing_policy,default_indexing_policy=DEFAULT_INDEXING_OPTIONS,collection_vector_service_options=self.collection_vector_service_options,collection_embedding_api_key=self.collection_embedding_api_key,)
def_get_safe_embedding(self)->Embeddings:ifnotself.embedding:msg="Missing embedding"raiseValueError(msg)returnself.embeddingdef_prepare_embedding_dimension(self,setup_mode:SetupMode)->int|Awaitable[int]|None:"""Return the right kind of object for the astra_env to use."""ifself.embeddingisNone:returnNoneifsetup_mode==SetupMode.ASYNC:# in this case, we wrap the computation as an awaitableasyncdef_aget_embedding_dimension()->int:ifself.embedding_dimensionisNone:self.embedding_dimension=len(awaitself._get_safe_embedding().aembed_query(text="This is a sample sentence."))returnself.embedding_dimensionreturn_aget_embedding_dimension()# case of setup_mode = SetupMode.SYNC, SetupMode.OFFifself.embedding_dimensionisNone:self.embedding_dimension=len(self._get_safe_embedding().embed_query(text="This is a sample sentence."))returnself.embedding_dimension@property@overridedefembeddings(self)->Embeddings|None:"""Accesses the supplied embeddings object. If using server-side embeddings, this will return None. """returnself.embeddingdef_select_relevance_score_fn(self)->Callable[[float],float]:# The underlying API calls already returns a "score proper",# i.e. one in [0, 1] where higher means more *similar*,# so here the final score transformation is not reversing the interval.returnlambdascore:score
[docs]defclear(self)->None:"""Empty the collection of all its stored entries."""self.astra_env.ensure_db_setup()self.astra_env.collection.delete_many({})
[docs]asyncdefaclear(self)->None:"""Empty the collection of all its stored entries."""awaitself.astra_env.aensure_db_setup()awaitself.astra_env.async_collection.delete_many({})
[docs]defdelete_by_document_id(self,document_id:str)->bool:"""Remove a single document from the store, given its document ID. Args: document_id: The document ID Returns: True if a document has indeed been deleted, False if ID not found. """self.astra_env.ensure_db_setup()# self.collection is not None (by _ensure_astra_db_client)deletion_response=self.astra_env.collection.delete_one({"_id":document_id})returndeletion_response.deleted_count==1
[docs]asyncdefadelete_by_document_id(self,document_id:str)->bool:"""Remove a single document from the store, given its document ID. Args: document_id: The document ID Returns: True if a document has indeed been deleted, False if ID not found. """awaitself.astra_env.aensure_db_setup()deletion_response=awaitself.astra_env.async_collection.delete_one({"_id":document_id},)returndeletion_response.deleted_count==1
[docs]@overridedefdelete(self,ids:list[str]|None=None,concurrency:int|None=None,**kwargs:Any,)->bool|None:"""Delete by vector ids. Args: ids: List of ids to delete. concurrency: max number of threads issuing single-doc delete requests. Defaults to vector-store overall setting. **kwargs: Additional arguments are ignored. Returns: True if deletion is (entirely) successful, False otherwise. """ifkwargs:warnings.warn("Method 'delete' of AstraDBVectorStore vector store invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)ifidsisNone:msg="No ids provided to delete."raiseValueError(msg)_max_workers=concurrencyorself.bulk_delete_concurrencywithThreadPoolExecutor(max_workers=_max_workers)astpe:_=list(tpe.map(self.delete_by_document_id,ids,))returnTrue
[docs]@overrideasyncdefadelete(self,ids:list[str]|None=None,concurrency:int|None=None,**kwargs:Any,)->bool|None:"""Delete by vector ids. Args: ids: List of ids to delete. concurrency: max number of simultaneous coroutines for single-doc delete requests. Defaults to vector-store overall setting. **kwargs: Additional arguments are ignored. Returns: True if deletion is (entirely) successful, False otherwise. """ifkwargs:warnings.warn("Method 'adelete' of AstraDBVectorStore invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)ifidsisNone:msg="No ids provided to delete."raiseValueError(msg)_max_workers=concurrencyorself.bulk_delete_concurrencyreturnall(awaitgather_with_concurrency(_max_workers,*[self.adelete_by_document_id(doc_id)fordoc_idinids]))
[docs]defdelete_collection(self)->None:"""Completely delete the collection from the database. Completely delete the collection from the database (as opposed to :meth:`~clear`, which empties it only). Stored data is lost and unrecoverable, resources are freed. Use with caution. """self.astra_env.ensure_db_setup()self.astra_env.collection.drop()
[docs]asyncdefadelete_collection(self)->None:"""Completely delete the collection from the database. Completely delete the collection from the database (as opposed to :meth:`~aclear`, which empties it only). Stored data is lost and unrecoverable, resources are freed. Use with caution. """awaitself.astra_env.aensure_db_setup()awaitself.astra_env.async_collection.drop()
def_get_documents_to_insert(self,texts:Iterable[str],embedding_vectors:Sequence[list[float]|None],metadatas:list[dict]|None=None,ids:list[str]|None=None,)->list[DocDict]:ifidsisNone:ids=[uuid.uuid4().hexfor_intexts]ifmetadatasisNone:metadatas=[{}for_intexts]documents_to_insert=[self.document_codec.encode(content=b_txt,document_id=b_id,vector=b_emb,metadata=b_md,)forb_txt,b_emb,b_id,b_mdinzip(texts,embedding_vectors,ids,metadatas,)]# make unique by id, keeping the lastreturn_unique_list(documents_to_insert[::-1],lambdadocument:document["_id"],)[::-1]@staticmethoddef_get_missing_from_batch(document_batch:list[DocDict],insert_result:dict[str,Any])->tuple[list[str],list[DocDict]]:if"status"notininsert_result:msg=f"API Exception while running bulk insertion: {insert_result}"raiseValueError(msg)batch_inserted=insert_result["status"]["insertedIds"]# estimation of the preexisting documents that failedmissed_inserted_ids={document["_id"]fordocumentindocument_batch}-set(batch_inserted)errors=insert_result.get("errors",[])# careful for other sources of error other than "doc already exists"num_errors=len(errors)unexpected_errors=any(error.get("errorCode")!="DOCUMENT_ALREADY_EXISTS"forerrorinerrors)ifnum_errors!=len(missed_inserted_ids)orunexpected_errors:msg=f"API Exception while running bulk insertion: {errors}"raiseValueError(msg)# deal with the missing insertions as upsertsmissing_from_batch=[documentfordocumentindocument_batchifdocument["_id"]inmissed_inserted_ids]returnbatch_inserted,missing_from_batch
[docs]@overridedefadd_texts(self,texts:Iterable[str],metadatas:list[dict]|None=None,ids:list[str]|None=None,*,batch_size:int|None=None,batch_concurrency:int|None=None,overwrite_concurrency:int|None=None,**kwargs:Any,)->list[str]:"""Run texts through the embeddings and add them to the vectorstore. If passing explicit ids, those entries whose id is in the store already will be replaced. Args: texts: Texts to add to the vectorstore. metadatas: Optional list of metadatas. ids: Optional list of ids. batch_size: Size of document chunks for each individual insertion API request. If not provided, defaults to the vector-store overall defaults (which in turn falls to astrapy defaults). batch_concurrency: number of threads to process insertion batches concurrently. Defaults to the vector-store overall setting if not provided. overwrite_concurrency: number of threads to process pre-existing documents in each batch. Defaults to the vector-store overall setting if not provided. **kwargs: Additional arguments are ignored. Note: There are constraints on the allowed field names in the metadata dictionaries, coming from the underlying Astra DB API. For instance, the ``$`` (dollar sign) cannot be used in the dict keys. See this document for details: https://docs.datastax.com/en/astra-db-serverless/api-reference/overview.html#limits Returns: The list of ids of the added texts. """ifkwargs:warnings.warn("Method 'add_texts' of AstraDBVectorStore vector store invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)self.astra_env.ensure_db_setup()embedding_vectors:Sequence[list[float]|None]ifself.document_codec.server_side_embeddings:embedding_vectors=[Nonefor_inlist(texts)]else:embedding_vectors=self._get_safe_embedding().embed_documents(list(texts))documents_to_insert=self._get_documents_to_insert(texts,embedding_vectors,metadatas,ids)# perform an AstraPy insert_many, catching exceptions for overwriting docsids_to_replace:list[int]inserted_ids:list[str]=[]try:insert_many_result=self.astra_env.collection.insert_many(documents_to_insert,ordered=False,concurrency=batch_concurrencyorself.bulk_insert_batch_concurrency,chunk_size=batch_sizeorself.batch_size,)ids_to_replace=[]inserted_ids=insert_many_result.inserted_idsexceptInsertManyExceptionaserr:inserted_ids=err.partial_result.inserted_idsinserted_ids_set=set(inserted_ids)ids_to_replace=[document["_id"]fordocumentindocuments_to_insertifdocument["_id"]notininserted_ids_set]# if necessary, replace docs for the non-inserted idsifids_to_replace:documents_to_replace=[documentfordocumentindocuments_to_insertifdocument["_id"]inids_to_replace]_max_workers=(overwrite_concurrencyorself.bulk_insert_overwrite_concurrency)withThreadPoolExecutor(max_workers=_max_workers,)asexecutor:def_replace_document(document:dict[str,Any],)->tuple[UpdateResult,str]:returnself.astra_env.collection.replace_one({"_id":document["_id"]},document,),document["_id"]replace_results=list(executor.map(_replace_document,documents_to_replace,))replaced_count=sum(r_res.update_info["n"]forr_res,_inreplace_results)inserted_ids+=[replaced_idfor_,replaced_idinreplace_results]ifreplaced_count!=len(ids_to_replace):missing=len(ids_to_replace)-replaced_countmsg=("AstraDBVectorStore.add_texts could not insert all requested "f"documents ({missing} failed replace_one calls)")raiseValueError(msg)returninserted_ids
[docs]@overrideasyncdefaadd_texts(self,texts:Iterable[str],metadatas:list[dict]|None=None,ids:list[str]|None=None,*,batch_size:int|None=None,batch_concurrency:int|None=None,overwrite_concurrency:int|None=None,**kwargs:Any,)->list[str]:"""Run texts through the embeddings and add them to the vectorstore. If passing explicit ids, those entries whose id is in the store already will be replaced. Args: texts: Texts to add to the vectorstore. metadatas: Optional list of metadatas. ids: Optional list of ids. batch_size: Size of document chunks for each individual insertion API request. If not provided, defaults to the vector-store overall defaults (which in turn falls to astrapy defaults). batch_concurrency: number of simultaneous coroutines to process insertion batches concurrently. Defaults to the vector-store overall setting if not provided. overwrite_concurrency: number of simultaneous coroutines to process pre-existing documents in each batch. Defaults to the vector-store overall setting if not provided. **kwargs: Additional arguments are ignored. Note: There are constraints on the allowed field names in the metadata dictionaries, coming from the underlying Astra DB API. For instance, the ``$`` (dollar sign) cannot be used in the dict keys. See this document for details: https://docs.datastax.com/en/astra-db-serverless/api-reference/overview.html#limits Returns: The list of ids of the added texts. """ifkwargs:warnings.warn("Method 'aadd_texts' of AstraDBVectorStore invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)awaitself.astra_env.aensure_db_setup()embedding_vectors:Sequence[list[float]|None]ifself.document_codec.server_side_embeddings:embedding_vectors=[Nonefor_inlist(texts)]else:embedding_vectors=awaitself._get_safe_embedding().aembed_documents(list(texts))documents_to_insert=self._get_documents_to_insert(texts,embedding_vectors,metadatas,ids)# perform an AstraPy insert_many, catching exceptions for overwriting docsids_to_replace:list[int]inserted_ids:list[str]=[]try:insert_many_result=awaitself.astra_env.async_collection.insert_many(documents_to_insert,ordered=False,concurrency=batch_concurrencyorself.bulk_insert_batch_concurrency,chunk_size=batch_sizeorself.batch_size,)ids_to_replace=[]inserted_ids=insert_many_result.inserted_idsexceptInsertManyExceptionaserr:inserted_ids=err.partial_result.inserted_idsinserted_ids_set=set(inserted_ids)ids_to_replace=[document["_id"]fordocumentindocuments_to_insertifdocument["_id"]notininserted_ids_set]# if necessary, replace docs for the non-inserted idsifids_to_replace:documents_to_replace=[documentfordocumentindocuments_to_insertifdocument["_id"]inids_to_replace]sem=asyncio.Semaphore(overwrite_concurrencyorself.bulk_insert_overwrite_concurrency,)_async_collection=self.astra_env.async_collectionasyncdef_replace_document(document:dict[str,Any],)->tuple[UpdateResult,str]:asyncwithsem:returnawait_async_collection.replace_one({"_id":document["_id"]},document,),document["_id"]tasks=[asyncio.create_task(_replace_document(document))fordocumentindocuments_to_replace]replace_results=awaitasyncio.gather(*tasks,return_exceptions=False)replaced_count=sum(r_res.update_info["n"]forr_res,_inreplace_results)inserted_ids+=[replaced_idfor_,replaced_idinreplace_results]ifreplaced_count!=len(ids_to_replace):missing=len(ids_to_replace)-replaced_countmsg=("AstraDBVectorStore.add_texts could not insert all requested "f"documents ({missing} failed replace_one calls)")raiseValueError(msg)returninserted_ids
[docs]@overridedefsimilarity_search(self,query:str,k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to query. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query. """return[docfor(doc,_,_)inself.similarity_search_with_score_id(query=query,k=k,filter=filter,)]
[docs]@overridedefsimilarity_search_with_score(self,query:str,k:int=4,filter:dict[str,Any]|None=None,)->list[tuple[Document,float]]:"""Return docs most similar to query with score. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,score)for(doc,score,_)inself.similarity_search_with_score_id(query=query,k=k,filter=filter,)]
[docs]defsimilarity_search_with_score_id(self,query:str,k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Return docs most similar to the query with score and id. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score, id), the most similar to the query. """ifself.document_codec.server_side_embeddings:sort={"$vectorize":query}returnself._similarity_search_with_score_id_by_sort(sort=sort,k=k,filter=filter,)embedding_vector=self._get_safe_embedding().embed_query(query)returnself.similarity_search_with_score_id_by_vector(embedding=embedding_vector,k=k,filter=filter,)
[docs]@overridedefsimilarity_search_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query vector. """return[docfor(doc,_,_)inself.similarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]defsimilarity_search_with_score_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float]]:"""Return docs most similar to embedding vector with score. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,score)for(doc,score,_)inself.similarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]defsimilarity_search_with_score_id_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Return docs most similar to embedding vector with score and id. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score, id), the most similar to the query vector. """ifself.document_codec.server_side_embeddings:msg=("Searching by vector on a Vector Store that uses server-side ""embeddings is not allowed.")raiseValueError(msg)sort={"$vector":embedding}returnself._similarity_search_with_score_id_by_sort(sort=sort,k=k,filter=filter,)
def_similarity_search_with_score_id_by_sort(self,sort:dict[str,Any],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Run ANN search with a provided sort clause."""self.astra_env.ensure_db_setup()metadata_parameter=self._filter_to_metadata(filter)hits_ite=self.astra_env.collection.find(filter=metadata_parameter,projection=self.document_codec.base_projection,limit=k,include_similarity=True,sort=sort,)return[(doc,sim,did)for(doc,sim,did)in((self.document_codec.decode(hit),hit["$similarity"],hit["_id"],)forhitinhits_ite)ifdocisnotNone]
[docs]@overrideasyncdefasimilarity_search(self,query:str,k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to query. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query. """return[docfor(doc,_,_)inawaitself.asimilarity_search_with_score_id(query=query,k=k,filter=filter,)]
[docs]@overrideasyncdefasimilarity_search_with_score(self,query:str,k:int=4,filter:dict[str,Any]|None=None,)->list[tuple[Document,float]]:"""Return docs most similar to query with score. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,score)for(doc,score,_)inawaitself.asimilarity_search_with_score_id(query=query,k=k,filter=filter,)]
[docs]asyncdefasimilarity_search_with_score_id(self,query:str,k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Return docs most similar to the query with score and id. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score, id), the most similar to the query. """ifself.document_codec.server_side_embeddings:sort={"$vectorize":query}returnawaitself._asimilarity_search_with_score_id_by_sort(sort=sort,k=k,filter=filter,)embedding_vector=awaitself._get_safe_embedding().aembed_query(query)returnawaitself.asimilarity_search_with_score_id_by_vector(embedding=embedding_vector,k=k,filter=filter,)
[docs]@overrideasyncdefasimilarity_search_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query vector. """return[docfor(doc,_,_)inawaitself.asimilarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]asyncdefasimilarity_search_with_score_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float]]:"""Return docs most similar to embedding vector with score. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,scr)for(doc,scr,_)inawaitself.asimilarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]asyncdefasimilarity_search_with_score_id_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Return docs most similar to embedding vector with score and id. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score, id), the most similar to the query vector. """ifself.document_codec.server_side_embeddings:msg=("Searching by vector on a Vector Store that uses server-side ""embeddings is not allowed.")raiseValueError(msg)sort={"$vector":embedding}returnawaitself._asimilarity_search_with_score_id_by_sort(sort=sort,k=k,filter=filter,)
asyncdef_asimilarity_search_with_score_id_by_sort(self,sort:dict[str,Any],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Run ANN search with a provided sort clause."""awaitself.astra_env.aensure_db_setup()metadata_parameter=self._filter_to_metadata(filter)return[(doc,sim,did)asyncfor(doc,sim,did)in((self.document_codec.decode(hit),hit["$similarity"],hit["_id"],)asyncforhitinself.astra_env.async_collection.find(filter=metadata_parameter,projection=self.document_codec.base_projection,limit=k,include_similarity=True,sort=sort,))ifdocisnotNone]def_run_mmr_query_by_sort(self,sort:dict[str,Any],k:int,fetch_k:int,lambda_mult:float,metadata_parameter:dict[str,Any],)->list[Document]:prefetch_cursor=self.astra_env.collection.find(filter=metadata_parameter,projection=self.document_codec.full_projection,limit=fetch_k,include_similarity=True,include_sort_vector=True,sort=sort,)prefetch_hits=list(prefetch_cursor)query_vector=prefetch_cursor.get_sort_vector()returnself._get_mmr_hits(embedding=query_vector,# type: ignore[arg-type]k=k,lambda_mult=lambda_mult,prefetch_hits=prefetch_hits,)asyncdef_arun_mmr_query_by_sort(self,sort:dict[str,Any],k:int,fetch_k:int,lambda_mult:float,metadata_parameter:dict[str,Any],)->list[Document]:prefetch_cursor=self.astra_env.async_collection.find(filter=metadata_parameter,projection=self.document_codec.full_projection,limit=fetch_k,include_similarity=True,include_sort_vector=True,sort=sort,)prefetch_hits=[hitasyncforhitinprefetch_cursor]query_vector=awaitprefetch_cursor.get_sort_vector()returnself._get_mmr_hits(embedding=query_vector,# type: ignore[arg-type]k=k,lambda_mult=lambda_mult,prefetch_hits=prefetch_hits,)def_get_mmr_hits(self,embedding:list[float],k:int,lambda_mult:float,prefetch_hits:list[DocDict],)->list[Document]:mmr_chosen_indices=maximal_marginal_relevance(np.array(embedding,dtype=np.float32),[prefetch_hit["$vector"]forprefetch_hitinprefetch_hits],k=k,lambda_mult=lambda_mult,)mmr_hits=[prefetch_hitforprefetch_index,prefetch_hitinenumerate(prefetch_hits)ifprefetch_indexinmmr_chosen_indices]return[docfordocin(self.document_codec.decode(hit)forhitinmmr_hits)ifdocisnotNone]
[docs]@overridedefmax_marginal_relevance_search_by_vector(self,embedding:list[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """self.astra_env.ensure_db_setup()metadata_parameter=self._filter_to_metadata(filter)returnself._run_mmr_query_by_sort(sort={"$vector":embedding},k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,metadata_parameter=metadata_parameter,)
[docs]@overrideasyncdefamax_marginal_relevance_search_by_vector(self,embedding:list[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """awaitself.astra_env.aensure_db_setup()metadata_parameter=self._filter_to_metadata(filter)returnawaitself._arun_mmr_query_by_sort(sort={"$vector":embedding},k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,metadata_parameter=metadata_parameter,)
[docs]@overridedefmax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Query to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """ifself.document_codec.server_side_embeddings:# this case goes directly to the "_by_sort" method# (and does its own filter normalization, as it cannot# use the path for the with-embedding mmr querying)metadata_parameter=self._filter_to_metadata(filter)returnself._run_mmr_query_by_sort(sort={"$vectorize":query},k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,metadata_parameter=metadata_parameter,)embedding_vector=self._get_safe_embedding().embed_query(query)returnself.max_marginal_relevance_search_by_vector(embedding_vector,k,fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)
[docs]@overrideasyncdefamax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Query to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """ifself.document_codec.server_side_embeddings:# this case goes directly to the "_by_sort" method# (and does its own filter normalization, as it cannot# use the path for the with-embedding mmr querying)metadata_parameter=self._filter_to_metadata(filter)returnawaitself._arun_mmr_query_by_sort(sort={"$vectorize":query},k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,metadata_parameter=metadata_parameter,)embedding_vector=awaitself._get_safe_embedding().aembed_query(query)returnawaitself.amax_marginal_relevance_search_by_vector(embedding_vector,k,fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)
@classmethoddef_from_kwargs(cls:type[AstraDBVectorStore],**kwargs:Any,)->AstraDBVectorStore:_args=inspect.getfullargspec(AstraDBVectorStore.__init__).args_kwargs=inspect.getfullargspec(AstraDBVectorStore.__init__).kwonlyargsknown_kwarg_keys=(set(_args)|set(_kwargs))-{"self"}ifkwargs:unknown_kwarg_keys=set(kwargs.keys())-known_kwarg_keysifunknown_kwarg_keys:warnings.warn(("Method 'from_texts/afrom_texts' of AstraDBVectorStore ""vector store invoked with unsupported arguments "f"({', '.join(sorted(unknown_kwarg_keys))}), ""which will be ignored."),UserWarning,stacklevel=3,)known_kwargs={k:vfork,vinkwargs.items()ifkinknown_kwarg_keys}returncls(**known_kwargs)
[docs]@classmethod@overridedeffrom_texts(cls:type[AstraDBVectorStore],texts:list[str],embedding:Embeddings|None=None,metadatas:list[dict]|None=None,ids:list[str]|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from raw texts. Args: texts: the texts to insert. embedding: the embedding function to use in the store. metadatas: metadata dicts for the texts. ids: ids to associate to the texts. **kwargs: you can pass any argument that you would to :meth:`~add_texts` and/or to the ``AstraDBVectorStore`` constructor (see these methods for details). These arguments will be routed to the respective methods as they are. Returns: an ``AstraDBVectorStore`` vectorstore. """_add_texts_inspection=inspect.getfullargspec(AstraDBVectorStore.add_texts)_method_args=(set(_add_texts_inspection.kwonlyargs)|set(_add_texts_inspection.kwonlyargs))-{"self","texts","metadatas","ids"}_init_kwargs={k:vfork,vinkwargs.items()ifknotin_method_args}_method_kwargs={k:vfork,vinkwargs.items()ifkin_method_args}astra_db_store=AstraDBVectorStore._from_kwargs(embedding=embedding,**_init_kwargs,)astra_db_store.add_texts(texts=texts,metadatas=metadatas,ids=ids,**_method_kwargs,)returnastra_db_store
[docs]@override@classmethodasyncdefafrom_texts(cls:type[AstraDBVectorStore],texts:list[str],embedding:Embeddings|None=None,metadatas:list[dict]|None=None,ids:list[str]|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from raw texts. Args: texts: the texts to insert. embedding: embedding function to use. metadatas: metadata dicts for the texts. ids: ids to associate to the texts. **kwargs: you can pass any argument that you would to :meth:`~aadd_texts` and/or to the ``AstraDBVectorStore`` constructor (see these methods for details). These arguments will be routed to the respective methods as they are. Returns: an ``AstraDBVectorStore`` vectorstore. """_aadd_texts_inspection=inspect.getfullargspec(AstraDBVectorStore.aadd_texts)_method_args=(set(_aadd_texts_inspection.kwonlyargs)|set(_aadd_texts_inspection.kwonlyargs))-{"self","texts","metadatas","ids"}_init_kwargs={k:vfork,vinkwargs.items()ifknotin_method_args}_method_kwargs={k:vfork,vinkwargs.items()ifkin_method_args}astra_db_store=AstraDBVectorStore._from_kwargs(embedding=embedding,**_init_kwargs,)awaitastra_db_store.aadd_texts(texts=texts,metadatas=metadatas,ids=ids,**_method_kwargs,)returnastra_db_store
[docs]@classmethod@overridedeffrom_documents(cls:type[AstraDBVectorStore],documents:list[Document],embedding:Embeddings|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from a document list. Utility method that defers to :meth:`from_texts` (see that one). Args: texts: the texts to insert. documents: a list of `Document` objects for insertion in the store. embedding: the embedding function to use in the store. **kwargs: you can pass any argument that you would to :meth:`~add_texts` and/or to the ``AstraDBVectorStore`` constructor (see these methods for details). These arguments will be routed to the respective methods as they are. Returns: an ``AstraDBVectorStore`` vectorstore. """texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]returncls.from_texts(texts,embedding=embedding,metadatas=metadatas,**kwargs,)
[docs]@classmethodasyncdefafrom_documents(cls:type[AstraDBVectorStore],documents:list[Document],embedding:Embeddings|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from a document list. Utility method that defers to :meth:`afrom_texts` (see that one). Args: see :meth:`afrom_texts`, except here you have to supply ``documents`` in place of ``texts`` and ``metadatas``. Returns: an ``AstraDBVectorStore`` vectorstore. """texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]returnawaitcls.afrom_texts(texts,embedding=embedding,metadatas=metadatas,**kwargs,)