"""Astra DB vector store integration."""from__future__importannotationsimportasyncioimportinspectimportloggingimportuuidimportwarningsfromconcurrent.futuresimportThreadPoolExecutorfromtypingimport(TYPE_CHECKING,Any,AsyncIterable,Awaitable,Callable,Dict,Iterable,Literal,NamedTuple,Sequence,TypeVar,Union,cast,overload,)importnumpyasnpfromastrapy.constantsimportEnvironmentfromastrapy.exceptionsimportCollectionInsertManyException,DataAPIResponseExceptionfromastrapy.infoimport(CollectionLexicalOptions,CollectionRerankOptions,VectorServiceOptions,)fromlangchain_community.vectorstores.utilsimportmaximal_marginal_relevancefromlangchain_core.documentsimportDocumentfromlangchain_core.runnables.utilsimportgather_with_concurrencyfromlangchain_core.vectorstoresimportVectorStorefromtyping_extensionsimportoverridefromlangchain_astradb.utils.astradbimport(COMPONENT_NAME_VECTORSTORE,DEFAULT_DOCUMENT_CHUNK_SIZE,MAX_CONCURRENT_DOCUMENT_DELETIONS,MAX_CONCURRENT_DOCUMENT_INSERTIONS,MAX_CONCURRENT_DOCUMENT_REPLACEMENTS,HybridSearchMode,SetupMode,_AstraDBCollectionEnvironment,_survey_collection,)fromlangchain_astradb.utils.vector_store_autodetectimport(_detect_document_codec,)fromlangchain_astradb.utils.vector_store_codecsimport(LEXICAL_FIELD_NAME,VECTOR_FIELD_NAME,VECTORIZE_FIELD_NAME,_AstraDBVectorStoreDocumentCodec,_DefaultVectorizeVSDocumentCodec,_DefaultVSDocumentCodec,)ifTYPE_CHECKING:fromastrapy.authenticationimport(EmbeddingHeadersProvider,RerankingHeadersProvider,TokenProvider,)fromastrapy.cursorsimportRerankedResultfromastrapy.infoimportRerankServiceOptionsfromastrapy.resultsimportCollectionUpdateResultfromlangchain_core.embeddingsimportEmbeddingsT=TypeVar("T")U=TypeVar("U")DocDict=Dict[str,Any]# dicts expressing entries to insert# error code to check for during bulk insertionsDOCUMENT_ALREADY_EXISTS_API_ERROR_CODE="DOCUMENT_ALREADY_EXISTS"# max number of errors shown in full insertion error messagesMAX_SHOWN_INSERTION_ERRORS=8# key for the 'rerank' score within the find_and_rerank scoresRERANK_SCORE_KEY="$rerank"# Error message for receiving a lexical_query for a non-hybrid searchERROR_LEXICAL_QUERY_ON_NONHYBRID_SEARCH=("Parameter 'lexical_query' cannot be passed for a non-hybrid search")logger=logging.getLogger(__name__)
[docs]classAstraDBQueryResult(NamedTuple):"""The complete information contained in a vector store entry. This class represents all that can be returned from the collection when running a query, which goes beyond just the corresponding Document. Atributes: document: a ``langchain.schema.Document`` object representing the query result. id: the ID of the returned document. embedding: the embedding vector associated to the document. This may be None, depending on whether the embeddings were requested in the query or not. similarity: the numeric similarity score of the document in the query. In case this quantity was not requested by the query, it will be set to None. """document:Documentid:strembedding:list[float]|Nonesimilarity:float|None
[docs]classHybridLimitFactorPrescription(NamedTuple):"""A per-subsearch setting for the hybrid-search 'limit' factors. This structure is to be used to set different values for the vector and the lexical portions of the hybrid search. Each of the attributes is a floating-point number, representing the multiplicative factor applied to a search final 'k' to calculate the "limit" value for the associated sub-search. For instance, if vector=1.5 and lexical=3.0, a hybrid search called by asking a final set of k=4 results will be executed with limits of 6 for vector and 12 for lexical. (The results are approximated to an integer.) Attributes: vector: the multiplicative factor for the "vector" part of the hybrid search. lexical: the multiplicative factor for the "lexical" part of the hybrid search. """vector:floatlexical:float
def_unique_list(lst:list[T],key:Callable[[T],U])->list[T]:visited_keys:set[U]=set()new_lst=[]foriteminlst:item_key=key(item)ifitem_keynotinvisited_keys:visited_keys.add(item_key)new_lst.append(item)returnnew_lstdef_normalize_content_field(content_field:str|None,*,is_autodetect:bool,has_vectorize:bool,)->str:ifhas_vectorize:ifcontent_fieldisnotNone:msg="content_field is not configurable for vectorize collections."raiseValueError(msg)returnVECTORIZE_FIELD_NAMEifcontent_fieldisNone:return"*"ifis_autodetectelse"content"ifcontent_field=="*":ifnotis_autodetect:msg="content_field='*' illegal if autodetect_collection is False."raiseValueError(msg)returncontent_fieldreturncontent_fielddef_validate_autodetect_init_params(*,metric:str|None=None,setup_mode:SetupMode|None,pre_delete_collection:bool,metadata_indexing_include:Iterable[str]|None,metadata_indexing_exclude:Iterable[str]|None,collection_indexing_policy:dict[str,Any]|None,collection_vector_service_options:VectorServiceOptions|None,collection_rerank:CollectionRerankOptions|RerankServiceOptions|None,collection_lexical:str|dict[str,Any]|CollectionLexicalOptions|None,)->None:"""Check that the passed parameters do not violate the autodetect constraints."""forbidden_parameters=[p_nameforp_name,p_valuein(("metric",metric),("metadata_indexing_include",metadata_indexing_include),("metadata_indexing_exclude",metadata_indexing_exclude),("collection_indexing_policy",collection_indexing_policy),("collection_vector_service_options",collection_vector_service_options),("collection_rerank",collection_rerank),("collection_lexical",collection_lexical),)ifp_valueisnotNone]fp_error:str|None=Noneifforbidden_parameters:fp_error=(f"Parameter(s) {', '.join(forbidden_parameters)}. were provided ""but cannot be passed.")pd_error:str|None=Noneifpre_delete_collection:pd_error="Parameter `pre_delete_collection` cannot be True."sm_error:str|None=Noneifsetup_modeisnotNone:sm_error="Parameter `setup_mode` not allowed."am_errors=[err_sforerr_sin(fp_error,pd_error,sm_error)iferr_sisnotNone]ifam_errors:msg=f"Invalid parameters for autodetect mode: {'; '.join(am_errors)}"raiseValueError(msg)def_decide_hybrid_search_setting(*,required_hybrid_search:HybridSearchMode|None,has_hybrid:bool,)->bool:"""Determine whether searches must be hybrid. Args: required_hybrid_search: the hybrid_search parameter required in the constructor. has_hybrid: whether the collection actually is hybrid-capable. """ifrequired_hybrid_search==HybridSearchMode.OFF:returnFalseifrequired_hybrid_search==HybridSearchMode.ON:returnTruereturnhas_hybriddef_make_hybrid_limits(hlf:None|float|dict[str,float],k:int,)->None|int|dict[str,int]:ifhlfisNone:returnNoneifisinstance(hlf,float):returnmax(int(hlf*k),1)# hlf is a dict:return{hlk:max(int(hlf*k),1)forhlk,hlfinhlf.items()}def_normalize_hybrid_limit_factor(hybrid_limit_factor:float|None|dict[str,float]|HybridLimitFactorPrescription,)->float|dict[str,float]|None:"""Bring `hybrid_limit_factor` to a normal form."""ifhybrid_limit_factorisNone:returnNoneifisinstance(hybrid_limit_factor,float):returnhybrid_limit_factorifisinstance(hybrid_limit_factor,HybridLimitFactorPrescription):return{VECTOR_FIELD_NAME:hybrid_limit_factor.vector,LEXICAL_FIELD_NAME:hybrid_limit_factor.lexical,}# already a dict:returnhybrid_limit_factordef_insertmany_error_message(err:CollectionInsertManyException)->str:"""Format an astrapy insert exception into an error message. This utility prepares a detailed message from an astrapy CollectionInsertManyException, to be used in raising an exception within a vectorstore multiple insertion. This operation must filter out duplicate-id specific errors (which the vector store could actually handle, if they were the only ones). """err_msg="Cannot insert documents. The Data API returned the following error(s): "def_describe_error(_errd:Exception)->list[str]:ifisinstance(_errd,DataAPIResponseException):return[edesc.messageor""foredescin_errd.error_descriptorsifedesc.error_code!=DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE]return[str(_errd)]filtered_error_descs=[edescforinsmany_errinerr.exceptionsforedescin_describe_error(insmany_err)]err_msg+="; ".join(edescor""foredescinfiltered_error_descs[:MAX_SHOWN_INSERTION_ERRORS])if(num_residual:=len(filtered_error_descs)-MAX_SHOWN_INSERTION_ERRORS)>0:err_msg+=f". (Note: {num_residual} further errors omitted.)"err_msg+=(" (Full API error in '<this-exception>.__cause__.error_descriptors'"f": ignore '{DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE}'.)")returnerr_msg
[docs]classAstraDBVectorStoreError(Exception):"""An exception during vector-store activities. This exception represents any operational exception occurring while performing an action within an AstraDBVectorStore. """
[docs]classAstraDBVectorStore(VectorStore):"""A vector store which uses DataStax Astra DB as backend. Setup: Install the ``langchain-astradb`` package and head to the `AstraDB website <https://astra.datastax.com>`_, create an account, create a new database and `create an application token <https://docs.datastax.com/en/astra-db-serverless/administration/manage-application-tokens.html>`_. .. code-block:: bash pip install -qU langchain-astradb Key init args — indexing params: collection_name: str Name of the collection. embedding: Embeddings Embedding function to use. Key init args — client params: api_endpoint: str Astra DB API endpoint. token: str API token for Astra DB usage. namespace: Optional[str] Namespace (aka keyspace) where the collection is created Instantiate: Get your API endpoint and application token from the dashboard of your database. Create a vector store and provide a LangChain embedding object for working with it: .. code-block:: python import getpass from langchain_astradb import AstraDBVectorStore from langchain_openai import OpenAIEmbeddings ASTRA_DB_API_ENDPOINT = getpass.getpass("ASTRA_DB_API_ENDPOINT = ") ASTRA_DB_APPLICATION_TOKEN = getpass.getpass("ASTRA_DB_APPLICATION_TOKEN = ") vector_store = AstraDBVectorStore( collection_name="astra_vector_langchain", embedding=OpenAIEmbeddings(), api_endpoint=ASTRA_DB_API_ENDPOINT, token=ASTRA_DB_APPLICATION_TOKEN, ) (Vectorize) Create a vector store where the embedding vector computation happens entirely on the server-side, using the `vectorize <https://docs.datastax.com/en/astra-db-serverless/databases/embedding-generation.html>`_ feature: .. code-block:: python import getpass from astrapy.info import VectorServiceOptions from langchain_astradb import AstraDBVectorStore ASTRA_DB_API_ENDPOINT = getpass.getpass("ASTRA_DB_API_ENDPOINT = ") ASTRA_DB_APPLICATION_TOKEN = getpass.getpass("ASTRA_DB_APPLICATION_TOKEN = ") vector_store = AstraDBVectorStore( collection_name="astra_vectorize_langchain", api_endpoint=ASTRA_DB_API_ENDPOINT, token=ASTRA_DB_APPLICATION_TOKEN, collection_vector_service_options=VectorServiceOptions( provider="nvidia", model_name="NV-Embed-QA", # authentication=..., # needed by some providers/models ), ) (Hybrid) The underlying Astra DB typically supports hybrid search (i.e. lexical + vector ANN) to boost the results' accuracy. This is provisioned and used automatically when available. For manual control, use the ``collection_rerank`` and ``collection_lexical`` constructor parameters: .. code-block:: python import getpass from astrapy.info import ( CollectionLexicalOptions, CollectionRerankOptions, RerankServiceOptions, VectorServiceOptions, ) from langchain_astradb import AstraDBVectorStore ASTRA_DB_API_ENDPOINT = getpass.getpass("ASTRA_DB_API_ENDPOINT = ") ASTRA_DB_APPLICATION_TOKEN = getpass.getpass("ASTRA_DB_APPLICATION_TOKEN = ") vector_store = AstraDBVectorStore( collection_name="astra_vectorize_langchain", # embedding=..., # needed unless using 'vectorize' api_endpoint=ASTRA_DB_API_ENDPOINT, token=ASTRA_DB_APPLICATION_TOKEN, collection_vector_service_options=VectorServiceOptions(...), # see above collection_lexical=CollectionLexicalOptions(analyzer="standard"), collection_rerank=CollectionRerankOptions( service=RerankServiceOptions( provider="nvidia", model_name="nvidia/llama-3.2-nv-rerankqa-1b-v2", ), ), collection_reranking_api_key=..., # if needed by the model/setup ) Hybrid-related server upgrades may introduce a mismatch between the store defaults and a pre-existing collection: in case one such mismatch is reported (as a Data API "EXISTING_COLLECTION_DIFFERENT_SETTINGS" error), the options to resolve are: (1) use autodetect mode, (2) switch to ``setup_mode`` "OFF", or (3) explicitly specify lexical and/or rerank settings in the vector store constructor, to match the existing collection configuration. See `here <https://github.com/langchain-ai/langchain-datastax/blob/main/libs/astradb/README.md#collection-defaults-mismatch>`_ for more details. (Autodetect) Let the vector store figure out the configuration (including vectorize and document encoding scheme on DB), by inspection of an existing collection: .. code-block:: python import getpass from langchain_astradb import AstraDBVectorStore ASTRA_DB_API_ENDPOINT = getpass.getpass("ASTRA_DB_API_ENDPOINT = ") ASTRA_DB_APPLICATION_TOKEN = getpass.getpass("ASTRA_DB_APPLICATION_TOKEN = ") vector_store = AstraDBVectorStore( collection_name="astra_existing_collection", # embedding=..., # needed unless using 'vectorize' api_endpoint=ASTRA_DB_API_ENDPOINT, token=ASTRA_DB_APPLICATION_TOKEN, autodetect_collection=True, ) (Non-Astra DB) This class can also target a non-Astra DB database, such as a self-deployed HCD, through the Data API: .. code-block:: python import getpass from astrapy.authentication import UsernamePasswordTokenProvider from langchain_astradb import AstraDBVectorStore vector_store = AstraDBVectorStore( collection_name="astra_existing_collection", # embedding=..., # needed unless using 'vectorize' api_endpoint="http://localhost:8181", token=UsernamePasswordTokenProvider( username="user", password="pwd", ), collection_vector_service_options=..., # if 'vectorize' ) Add Documents: .. code-block:: python from langchain_core.documents import Document document_1 = Document(page_content="foo", metadata={"baz": "bar"}) document_2 = Document(page_content="thud", metadata={"bar": "baz"}) document_3 = Document(page_content="i will be deleted :(") documents = [document_1, document_2, document_3] ids = ["1", "2", "3"] vector_store.add_documents(documents=documents, ids=ids) Delete Documents: .. code-block:: python vector_store.delete(ids=["3"]) Search: .. code-block:: python results = vector_store.similarity_search(query="thud",k=1) for doc in results: print(f"* {doc.page_content} [{doc.metadata}]") .. code-block:: none thud [{'bar': 'baz'}] Search with filter: .. code-block:: python results = vector_store.similarity_search(query="thud",k=1,filter={"bar": "baz"}) for doc in results: print(f"* {doc.page_content} [{doc.metadata}]") .. code-block:: none thud [{'bar': 'baz'}] Search with score: .. code-block:: python results = vector_store.similarity_search_with_score(query="qux",k=1) for doc, score in results: print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") .. code-block:: none [SIM=0.916135] foo [{'baz': 'bar'}] Async: .. code-block:: python # add documents await vector_store.aadd_documents(documents=documents, ids=ids) # delete documents await vector_store.adelete(ids=["3"]) # search results = vector_store.asimilarity_search(query="thud",k=1) # search with score results = await vector_store.asimilarity_search_with_score(query="qux",k=1) for doc,score in results: print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") .. code-block:: none [SIM=0.916135] foo [{'baz': 'bar'}] Use as Retriever: .. code-block:: python retriever = vector_store.as_retriever( search_type="similarity_score_threshold", search_kwargs={"k": 1, "score_threshold": 0.5}, ) retriever.invoke("thud") .. code-block:: none [Document(metadata={'bar': 'baz'}, page_content='thud')] """# noqa: E501
[docs]deffilter_to_query(self,filter_dict:dict[str,Any]|None)->dict[str,Any]:"""Prepare a query for use on DB based on metadata filter. Encode an "abstract" filter clause on metadata into a query filter condition aware of the collection schema choice. Args: filter_dict: a metadata condition in the form {"field": "value"} or related. Returns: the corresponding mapping ready for use in queries, aware of the details of the schema used to encode the document on DB. """iffilter_dictisNone:return{}returnself.document_codec.encode_filter(filter_dict)
@staticmethoddef_normalize_metadata_indexing_policy(metadata_indexing_include:Iterable[str]|None,metadata_indexing_exclude:Iterable[str]|None,collection_indexing_policy:dict[str,Any]|None,document_codec:_AstraDBVectorStoreDocumentCodec,)->dict[str,Any]:"""Normalize the constructor indexing parameters. Validate the constructor indexing parameters and normalize them into a ready-to-use dict for the 'options' when creating a collection. """params=[metadata_indexing_include,metadata_indexing_exclude,collection_indexing_policy,]ifparams.count(None)<len(params)-1:msg=("At most one of the parameters `metadata_indexing_include`,"" `metadata_indexing_exclude` and `collection_indexing_policy`"" can be specified as non null.")raiseValueError(msg)ifmetadata_indexing_includeisnotNone:return{"allow":[document_codec.metadata_key_to_field_identifier(md_field)formd_fieldinmetadata_indexing_include]}ifmetadata_indexing_excludeisnotNone:return{"deny":[document_codec.metadata_key_to_field_identifier(md_field)formd_fieldinmetadata_indexing_exclude]}return(collection_indexing_policyifcollection_indexing_policyisnotNoneelsedocument_codec.default_collection_indexing_policy)
[docs]def__init__(self,*,collection_name:str,embedding:Embeddings|None=None,token:str|TokenProvider|None=None,api_endpoint:str|None=None,environment:str|None=None,namespace:str|None=None,metric:str|None=None,batch_size:int|None=None,bulk_insert_batch_concurrency:int|None=None,bulk_insert_overwrite_concurrency:int|None=None,bulk_delete_concurrency:int|None=None,setup_mode:SetupMode|None=None,pre_delete_collection:bool=False,metadata_indexing_include:Iterable[str]|None=None,metadata_indexing_exclude:Iterable[str]|None=None,collection_indexing_policy:dict[str,Any]|None=None,collection_vector_service_options:VectorServiceOptions|None=None,collection_embedding_api_key:str|EmbeddingHeadersProvider|None=None,content_field:str|None=None,ignore_invalid_documents:bool=False,autodetect_collection:bool=False,ext_callers:list[tuple[str|None,str|None]|str|None]|None=None,component_name:str=COMPONENT_NAME_VECTORSTORE,collection_rerank:CollectionRerankOptions|RerankServiceOptions|None=None,collection_reranking_api_key:str|RerankingHeadersProvider|None=None,collection_lexical:str|dict[str,Any]|CollectionLexicalOptions|None=None,hybrid_search:HybridSearchMode|None=None,hybrid_limit_factor:float|None|dict[str,float]|HybridLimitFactorPrescription=None,)->None:"""A vector store wich uses DataStax Astra DB as backend. For more on Astra DB, visit https://docs.datastax.com/en/astra-db-serverless/index.html Args: embedding: the embeddings function or service to use. This enables client-side embedding functions or calls to external embedding providers. If ``embedding`` is passed, then ``collection_vector_service_options`` can not be provided. collection_name: name of the Astra DB collection to create/use. token: API token for Astra DB usage, either in the form of a string or a subclass of ``astrapy.authentication.TokenProvider``. If not provided, the environment variable ASTRA_DB_APPLICATION_TOKEN is inspected. api_endpoint: full URL to the API endpoint, such as ``https://<DB-ID>-us-east1.apps.astra.datastax.com``. If not provided, the environment variable ASTRA_DB_API_ENDPOINT is inspected. environment: a string specifying the environment of the target Data API. If omitted, defaults to "prod" (Astra DB production). Other values are in ``astrapy.constants.Environment`` enum class. namespace: namespace (aka keyspace) where the collection is created. If not provided, the environment variable ASTRA_DB_KEYSPACE is inspected. Defaults to the database's "default namespace". metric: similarity function to use out of those available in Astra DB. If left out, it will use Astra DB API's defaults (i.e. "cosine" - but, for performance reasons, "dot_product" is suggested if embeddings are normalized to one). batch_size: Size of document chunks for each individual insertion API request. If not provided, astrapy defaults are applied. bulk_insert_batch_concurrency: Number of threads or coroutines to insert batches concurrently. bulk_insert_overwrite_concurrency: Number of threads or coroutines in a batch to insert pre-existing entries. bulk_delete_concurrency: Number of threads or coroutines for multiple-entry deletes. setup_mode: mode used to create the collection (SYNC, ASYNC or OFF). pre_delete_collection: whether to delete the collection before creating it. If False and the collection already exists, the collection will be used as is. metadata_indexing_include: an allowlist of the specific metadata subfields that should be indexed for later filtering in searches. metadata_indexing_exclude: a denylist of the specific metadata subfields that should not be indexed for later filtering in searches. collection_indexing_policy: a full "indexing" specification for what fields should be indexed for later filtering in searches. This dict must conform to to the API specifications (see https://docs.datastax.com/en/astra-db-serverless/api-reference/collections.html#the-indexing-option) collection_vector_service_options: specifies the use of server-side embeddings within Astra DB. If passing this parameter, ``embedding`` cannot be provided. collection_embedding_api_key: for usage of server-side embeddings within Astra DB. With this parameter one can supply an API Key that will be passed to Astra DB with each data request. This parameter can be either a string or a subclass of ``astrapy.authentication.EmbeddingHeadersProvider``. This is useful when the service is configured for the collection, but no corresponding secret is stored within Astra's key management system. content_field: name of the field containing the textual content in the documents when saved on Astra DB. For vectorize collections, this cannot be specified; for non-vectorize collection, defaults to "content". The special value "*" can be passed only if autodetect_collection=True. In this case, the actual name of the key for the textual content is guessed by inspection of a few documents from the collection, under the assumption that the longer strings are the most likely candidates. Please understand the limitations of this method and get some understanding of your data before passing ``"*"`` for this parameter. ignore_invalid_documents: if False (default), exceptions are raised when a document is found on the Astra DB collection that does not have the expected shape. If set to True, such results from the database are ignored and a warning is issued. Note that in this case a similarity search may end up returning fewer results than the required ``k``. autodetect_collection: if True, turns on autodetect behavior. The store will look for an existing collection of the provided name and infer the store settings from it. Default is False. In autodetect mode, ``content_field`` can be given as ``"*"``, meaning that an attempt will be made to determine it by inspection (unless vectorize is enabled, in which case ``content_field`` is ignored). In autodetect mode, the store not only determines whether embeddings are client- or server-side, but - most importantly - switches automatically between "nested" and "flat" representations of documents on DB (i.e. having the metadata key-value pairs grouped in a ``metadata`` field or spread at the documents' top-level). The former scheme is the native mode of the AstraDBVectorStore; the store resorts to the latter in case of vector collections populated with external means (such as a third-party data import tool) before applying an AstraDBVectorStore to them. Note that the following parameters cannot be used if this is True: ``metric``, ``setup_mode``, ``metadata_indexing_include``, ``metadata_indexing_exclude``, ``collection_indexing_policy``, ``collection_vector_service_options``. ext_callers: one or more caller identities to identify Data API calls in the User-Agent header. This is a list of (name, version) pairs, or just strings if no version info is provided, which, if supplied, becomes the leading part of the User-Agent string in all API requests related to this component. component_name: the string identifying this specific component in the stack of usage info passed as the User-Agent string to the Data API. Defaults to "langchain_vectorstore", but can be overridden if this component actually serves as the building block for another component (such as a Graph Vector Store). collection_rerank: providing reranking settings is necessary to run hybrid searches for similarity. This parameter can be an instance of the astrapy classes `CollectionRerankOptions` or ``RerankServiceOptions``. collection_reranking_api_key: for usage of server-side reranking services within Astra DB. With this parameter one can supply an API Key that will be passed to Astra DB with each data request. This parameter can be either a string or a subclass of ``astrapy.authentication.RerankingHeadersProvider``. This is useful when the service is configured for the collection, but no corresponding secret is stored within Astra's key management system. collection_lexical: configuring a lexical analyzer is necessary to run lexical and hybrid searches. This parameter can be a string or dict, which is then passed as-is for the "analyzer" field of a createCollection's "$lexical.analyzer" value, or a ready-made astrapy `CollectionLexicalOptions` object. hybrid_search: whether similarity searches should be run as Hybrid searches or not. Values are DEFAULT, ON or OFF. In case of DEFAULT, searches are performed as permitted by the collection configuration, with a preference for hybrid search. Forcing this setting to ON for a non-hybrid-enabled collection would result in a server error when running searches. hybrid_limit_factor: subsearch "limit" specification for hybrid searches. If omitted, hybrid searches do not specify it and leave the Data API to use its defaults. If a floating-point positive number is provided: each subsearch participating in the hybrid search (i.e. both the vector-based ANN and the lexical-based) will be requested to fecth up to `int(k*hybrid_limit_factor)` items, where `k` is the desired result count from the whole search. If a `HybridLimitFactorPrescription` is provided (see the class docstring for details), separate factors are applied to the vector and the lexical subsearches. Alternatively, a simple dictionary with keys "$lexical" and "$vector" achieves the same effect. Note: For concurrency in synchronous :meth:`~add_texts`:, as a rule of thumb, on a typical client machine it is suggested to keep the quantity bulk_insert_batch_concurrency * bulk_insert_overwrite_concurrency much below 1000 to avoid exhausting the client multithreading/networking resources. The hardcoded defaults are somewhat conservative to meet most machines' specs, but a sensible choice to test may be: - bulk_insert_batch_concurrency = 80 - bulk_insert_overwrite_concurrency = 10 A bit of experimentation is required to nail the best results here, depending on both the machine/network specs and the expected workload (specifically, how often a write is an update of an existing id). Remember you can pass concurrency settings to individual calls to :meth:`~add_texts` and :meth:`~add_documents` as well. """# general collection settingsself.collection_name=collection_nameself.token=tokenself.api_endpoint=api_endpointself.environment=environmentself.namespace=namespaceself.indexing_policy:dict[str,Any]self.autodetect_collection=autodetect_collection# vector-related settingsself.embedding_dimension:int|None=Noneself.embedding=embeddingself.metric=metricself.collection_embedding_api_key=collection_embedding_api_keyself.collection_vector_service_options=collection_vector_service_options# DB-encoding settings:self.document_codec:_AstraDBVectorStoreDocumentCodec# concurrency settingsself.batch_size:int|None=batch_sizeorDEFAULT_DOCUMENT_CHUNK_SIZEself.bulk_insert_batch_concurrency:int=(bulk_insert_batch_concurrencyorMAX_CONCURRENT_DOCUMENT_INSERTIONS)self.bulk_insert_overwrite_concurrency:int=(bulk_insert_overwrite_concurrencyorMAX_CONCURRENT_DOCUMENT_REPLACEMENTS)self.bulk_delete_concurrency:int=(bulk_delete_concurrencyorMAX_CONCURRENT_DOCUMENT_DELETIONS)_setup_mode:SetupMode_embedding_dimension:int|Awaitable[int]|Noneself.has_lexical:boolself.has_hybrid:boolself.hybrid_search:bool# affecting the actual behaviour when running searchesself.hybrid_limit_factor:None|float|dict[str,float]self.collection_reranking_api_key=collection_reranking_api_keyifnotself.autodetect_collection:logger.info("vector store default init, collection '%s'",self.collection_name)_setup_mode=SetupMode.SYNCifsetup_modeisNoneelsesetup_mode_embedding_dimension=self._prepare_embedding_dimension(_setup_mode)# determine vectorize/nonvectorizehas_vectorize=self.collection_vector_service_optionsisnotNone_content_field=_normalize_content_field(content_field,is_autodetect=False,has_vectorize=has_vectorize,)self.has_lexical=(collection_lexical.enabledifisinstance(collection_lexical,CollectionLexicalOptions)else(collection_lexicalisnotNone))_has_reranking=(collection_rerank.enabledifisinstance(collection_rerank,CollectionRerankOptions)else(collection_rerankisnotNone))self.has_hybrid=self.has_lexicaland_has_rerankingself.hybrid_search=_decide_hybrid_search_setting(required_hybrid_search=hybrid_search,has_hybrid=self.has_hybrid,)ifself.collection_vector_service_optionsisnotNone:self.document_codec=_DefaultVectorizeVSDocumentCodec(ignore_invalid_documents=ignore_invalid_documents,has_lexical=self.has_lexical,)else:self.document_codec=_DefaultVSDocumentCodec(content_field=_content_field,ignore_invalid_documents=ignore_invalid_documents,has_lexical=self.has_lexical,)# indexing policy settingself.indexing_policy=self._normalize_metadata_indexing_policy(metadata_indexing_include=metadata_indexing_include,metadata_indexing_exclude=metadata_indexing_exclude,collection_indexing_policy=collection_indexing_policy,document_codec=self.document_codec,)else:logger.info("vector store autodetect init, collection '%s'",self.collection_name)# specific checks for autodetect logic_validate_autodetect_init_params(metric=self.metric,setup_mode=setup_mode,pre_delete_collection=pre_delete_collection,metadata_indexing_include=metadata_indexing_include,metadata_indexing_exclude=metadata_indexing_exclude,collection_indexing_policy=collection_indexing_policy,collection_vector_service_options=self.collection_vector_service_options,collection_lexical=collection_lexical,collection_rerank=collection_rerank,)_setup_mode=SetupMode.OFF# fetch collection intelligencec_descriptor,c_documents=_survey_collection(collection_name=self.collection_name,token=self.token,api_endpoint=self.api_endpoint,keyspace=self.namespace,environment=self.environment,ext_callers=ext_callers,component_name=component_name,)ifc_descriptorisNone:msg=f"Collection '{self.collection_name}' not found."raiseValueError(msg)# use the collection info to set the store propertiesc_vector_options=c_descriptor.definition.as_dict().get("vector")or{}ifnotc_vector_options:msg="Non-vector collection detected."raiseValueError(msg)_embedding_dimension=c_vector_options.get("dimension")self.collection_vector_service_options=c_vector_options.get("service")has_vectorize=self.collection_vector_service_optionsisnotNonelogger.info("vector store autodetect: has_vectorize = %s",has_vectorize)norm_content_field=_normalize_content_field(content_field,is_autodetect=True,has_vectorize=has_vectorize,)self.has_lexical=(c_descriptor.definition.lexicalisnotNoneandc_descriptor.definition.lexical.enabled)self.document_codec=_detect_document_codec(c_documents,has_vectorize=has_vectorize,has_lexical=self.has_lexical,ignore_invalid_documents=ignore_invalid_documents,norm_content_field=norm_content_field,)self.indexing_policy=self._normalize_metadata_indexing_policy(metadata_indexing_include=None,metadata_indexing_exclude=None,collection_indexing_policy=c_descriptor.definition.indexing,document_codec=self.document_codec,)_has_reranking=(c_descriptor.definition.rerankisnotNoneandc_descriptor.definition.rerank.enabled)self.has_hybrid=self.has_lexicaland_has_rerankingself.hybrid_search=_decide_hybrid_search_setting(required_hybrid_search=hybrid_search,has_hybrid=self.has_hybrid,)# validate embedding/vectorize compatibility and such.# Embedding and the server-side embeddings are mutually exclusive,# as both specify how to produce embeddings.# Also API key makes no sense unless vectorize.ifself.embeddingisNoneandnotself.document_codec.server_side_embeddings:msg="Embedding is required for non-vectorize collections."raiseValueError(msg)ifself.embeddingisnotNoneandself.document_codec.server_side_embeddings:msg="Embedding cannot be provided for vectorize collections."raiseValueError(msg)self.hybrid_limit_factor=_normalize_hybrid_limit_factor(hybrid_limit_factor)self.astra_env=_AstraDBCollectionEnvironment(collection_name=collection_name,token=self.token,api_endpoint=self.api_endpoint,keyspace=self.namespace,environment=self.environment,setup_mode=_setup_mode,pre_delete_collection=pre_delete_collection,embedding_dimension=_embedding_dimension,metric=self.metric,requested_indexing_policy=self.indexing_policy,default_indexing_policy=(self.document_codec.default_collection_indexing_policy),collection_vector_service_options=self.collection_vector_service_options,collection_embedding_api_key=self.collection_embedding_api_key,ext_callers=ext_callers,component_name=component_name,collection_rerank=collection_rerank,collection_reranking_api_key=self.collection_reranking_api_key,collection_lexical=collection_lexical,)
def_get_safe_embedding(self)->Embeddings:ifnotself.embedding:msg="Missing embedding"raiseValueError(msg)returnself.embeddingdef_prepare_embedding_dimension(self,setup_mode:SetupMode)->int|Awaitable[int]|None:"""Return the right kind of object for the astra_env to use."""ifself.embeddingisNone:returnNoneifsetup_mode==SetupMode.ASYNC:# in this case, we wrap the computation as an awaitableasyncdef_aget_embedding_dimension()->int:ifself.embedding_dimensionisNone:self.embedding_dimension=len(awaitself._get_safe_embedding().aembed_query(text="This is a sample sentence."))returnself.embedding_dimensionreturn_aget_embedding_dimension()# case of setup_mode = SetupMode.SYNC, SetupMode.OFFifself.embedding_dimensionisNone:self.embedding_dimension=len(self._get_safe_embedding().embed_query(text="This is a sample sentence."))returnself.embedding_dimension@property@overridedefembeddings(self)->Embeddings|None:"""Accesses the supplied embeddings object. If using server-side embeddings, this will return None. """returnself.embedding@overridedef_select_relevance_score_fn(self)->Callable[[float],float]:# The underlying API calls already returns a "score proper",# i.e. one in [0, 1] where higher means more *similar*,# so here the final score transformation is not reversing the interval.returnlambdascore:score
[docs]defcopy(self,*,token:str|TokenProvider|None=None,ext_callers:list[tuple[str|None,str|None]|str|None]|None=None,component_name:str|None=None,collection_embedding_api_key:str|EmbeddingHeadersProvider|None=None,collection_reranking_api_key:str|RerankingHeadersProvider|None=None,)->AstraDBVectorStore:"""Create a copy, possibly with changed attributes. This method creates a shallow copy of this environment. If a parameter is passed and differs from None, it will replace the corresponding value in the copy. The method allows changing only the parameters that ensure the copy is functional and does not trigger side-effects: for example, one cannot create a copy acting on a new collection. In those cases, one should create a new instance of ``AstraDBVectorStore`` from scratch. Attributes: token: API token for Astra DB usage, either in the form of a string or a subclass of ``astrapy.authentication.TokenProvider``. In order to suppress token usage in the copy, explicitly pass ``astrapy.authentication.StaticTokenProvider(None)``. ext_callers: additional custom (caller_name, caller_version) pairs to attach to the User-Agent header when issuing Data API requests. component_name: a value for the LangChain component name to use when identifying the originator of the Data API requests. collection_embedding_api_key: the API Key to supply in each Data API request if necessary. This is necessary if using the Vectorize feature and no secret is stored with the database. In order to suppress the API Key in the copy, explicitly pass ``astrapy.authentication.EmbeddingAPIKeyHeaderProvider(None)``. collection_reranking_api_key: for usage of server-side reranking services within Astra DB. With this parameter one can supply an API Key that will be passed to Astra DB with each data request. This parameter can be either a string or a subclass of ``astrapy.authentication.RerankingHeadersProvider``. This is useful when the service is configured for the collection, but no corresponding secret is stored within Astra's key management system. """copy=AstraDBVectorStore(collection_name="moot",api_endpoint="http://moot",environment=Environment.OTHER,namespace="moot",setup_mode=SetupMode.OFF,collection_vector_service_options=VectorServiceOptions(provider="moot",model_name="moot",),)copy.collection_name=self.collection_namecopy.token=self.tokeniftokenisNoneelsetokencopy.api_endpoint=self.api_endpointcopy.environment=self.environmentcopy.namespace=self.namespacecopy.indexing_policy=self.indexing_policycopy.autodetect_collection=self.autodetect_collectioncopy.embedding_dimension=self.embedding_dimensioncopy.embedding=self.embeddingcopy.metric=self.metriccopy.collection_embedding_api_key=(self.collection_embedding_api_keyifcollection_embedding_api_keyisNoneelsecollection_embedding_api_key)copy.collection_reranking_api_key=(self.collection_reranking_api_keyifcollection_reranking_api_keyisNoneelsecollection_reranking_api_key)copy.collection_vector_service_options=self.collection_vector_service_optionscopy.document_codec=self.document_codeccopy.has_lexical=self.has_lexicalcopy.has_hybrid=self.hybrid_searchcopy.hybrid_limit_factor=self.hybrid_limit_factorcopy.batch_size=self.batch_sizecopy.bulk_insert_batch_concurrency=self.bulk_insert_batch_concurrencycopy.bulk_insert_overwrite_concurrency=self.bulk_insert_overwrite_concurrencycopy.bulk_delete_concurrency=self.bulk_delete_concurrency# Now the .astra_env attribute:copy.astra_env=self.astra_env.copy(token=token,ext_callers=ext_callers,component_name=component_name,collection_embedding_api_key=collection_embedding_api_key,collection_reranking_api_key=collection_reranking_api_key,)returncopy
[docs]defclear(self)->None:"""Empty the collection of all its stored entries."""self.astra_env.ensure_db_setup()self.astra_env.collection.delete_many({})
[docs]asyncdefaclear(self)->None:"""Empty the collection of all its stored entries."""awaitself.astra_env.aensure_db_setup()awaitself.astra_env.async_collection.delete_many({})
[docs]defdelete_by_document_id(self,document_id:str)->bool:"""Remove a single document from the store, given its document ID. Args: document_id: The document ID Returns: True if a document has indeed been deleted, False if ID not found. """self.astra_env.ensure_db_setup()# self.collection is not None (by _ensure_astra_db_client)deletion_response=self.astra_env.collection.delete_one(self.document_codec.encode_query(ids=[document_id]),)returndeletion_response.deleted_count==1
[docs]asyncdefadelete_by_document_id(self,document_id:str)->bool:"""Remove a single document from the store, given its document ID. Args: document_id: The document ID Returns: True if a document has indeed been deleted, False if ID not found. """awaitself.astra_env.aensure_db_setup()deletion_response=awaitself.astra_env.async_collection.delete_one(self.document_codec.encode_query(ids=[document_id]),)returndeletion_response.deleted_count==1
[docs]@overridedefdelete(self,ids:list[str]|None=None,concurrency:int|None=None,**kwargs:Any,)->bool|None:"""Delete by vector ids. Args: ids: List of ids to delete. concurrency: max number of threads issuing single-doc delete requests. Defaults to vector-store overall setting. **kwargs: Additional arguments are ignored. Returns: True if deletion is (entirely) successful, False otherwise. """ifkwargs:warnings.warn("Method 'delete' of AstraDBVectorStore vector store invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)ifidsisNone:msg="No ids provided to delete."raiseValueError(msg)_max_workers=concurrencyorself.bulk_delete_concurrencywithThreadPoolExecutor(max_workers=_max_workers)astpe:_=list(tpe.map(self.delete_by_document_id,ids,))returnTrue
[docs]@overrideasyncdefadelete(self,ids:list[str]|None=None,concurrency:int|None=None,**kwargs:Any,)->bool|None:"""Delete by vector ids. Args: ids: List of ids to delete. concurrency: max number of simultaneous coroutines for single-doc delete requests. Defaults to vector-store overall setting. **kwargs: Additional arguments are ignored. Returns: True if deletion is (entirely) successful, False otherwise. """ifkwargs:warnings.warn("Method 'adelete' of AstraDBVectorStore invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)ifidsisNone:msg="No ids provided to delete."raiseValueError(msg)_max_workers=concurrencyorself.bulk_delete_concurrencyawaitgather_with_concurrency(_max_workers,*[self.adelete_by_document_id(doc_id)fordoc_idinids])returnTrue
[docs]defdelete_by_metadata_filter(self,filter:dict[str,Any],# noqa: A002)->int:"""Delete all documents matching a certain metadata filtering condition. This operation does not use the vector embeddings in any way, it simply removes all documents whose metadata match the provided condition. Args: filter: Filter on the metadata to apply. The filter cannot be empty. Returns: A number expressing the amount of deleted documents. """ifnotfilter:msg=("Method `delete_by_metadata_filter` does not accept an empty ""filter. Use the `clear()` method if you really want to empty ""the vector store.")raiseValueError(msg)self.astra_env.ensure_db_setup()metadata_parameter=self.filter_to_query(filter)del_result=self.astra_env.collection.delete_many(filter=metadata_parameter,)returndel_result.deleted_countor0
[docs]asyncdefadelete_by_metadata_filter(self,filter:dict[str,Any],# noqa: A002)->int:"""Delete all documents matching a certain metadata filtering condition. This operation does not use the vector embeddings in any way, it simply removes all documents whose metadata match the provided condition. Args: filter: Filter on the metadata to apply. The filter cannot be empty. Returns: A number expressing the amount of deleted documents. """ifnotfilter:msg=("Method `adelete_by_metadata_filter` does not accept an empty ""filter. Use the `aclear()` method if you really want to empty ""the vector store.")raiseValueError(msg)awaitself.astra_env.aensure_db_setup()metadata_parameter=self.filter_to_query(filter)del_result=awaitself.astra_env.async_collection.delete_many(filter=metadata_parameter,)returndel_result.deleted_countor0
[docs]defdelete_collection(self)->None:"""Completely delete the collection from the database. Completely delete the collection from the database (as opposed to :meth:`~clear`, which empties it only). Stored data is lost and unrecoverable, resources are freed. Use with caution. """self.astra_env.ensure_db_setup()self.astra_env.collection.drop()
[docs]asyncdefadelete_collection(self)->None:"""Completely delete the collection from the database. Completely delete the collection from the database (as opposed to :meth:`~aclear`, which empties it only). Stored data is lost and unrecoverable, resources are freed. Use with caution. """awaitself.astra_env.aensure_db_setup()awaitself.astra_env.async_collection.drop()
def_get_documents_to_insert(self,texts:Iterable[str],embedding_vectors:Sequence[list[float]|None],metadatas:list[dict]|None=None,ids:list[str]|None=None,)->list[DocDict]:ifidsisNone:ids=[uuid.uuid4().hexfor_intexts]ifmetadatasisNone:metadatas=[{}for_intexts]documents_to_insert=[self.document_codec.encode(content=b_txt,document_id=b_id,vector=b_emb,metadata=b_md,)forb_txt,b_emb,b_id,b_mdinzip(texts,embedding_vectors,ids,metadatas,)]# make unique by id, keeping the lastreturn_unique_list(documents_to_insert[::-1],self.document_codec.get_id,)[::-1]
[docs]@overridedefadd_texts(self,texts:Iterable[str],metadatas:list[dict]|None=None,ids:list[str]|None=None,*,batch_size:int|None=None,batch_concurrency:int|None=None,overwrite_concurrency:int|None=None,**kwargs:Any,)->list[str]:"""Run texts through the embeddings and add them to the vectorstore. If passing explicit ids, those entries whose id is in the store already will be replaced. Args: texts: Texts to add to the vectorstore. metadatas: Optional list of metadatas. ids: Optional list of ids. batch_size: Size of document chunks for each individual insertion API request. If not provided, defaults to the vector-store overall defaults (which in turn falls to astrapy defaults). batch_concurrency: number of threads to process insertion batches concurrently. Defaults to the vector-store overall setting if not provided. overwrite_concurrency: number of threads to process pre-existing documents in each batch. Defaults to the vector-store overall setting if not provided. **kwargs: Additional arguments are ignored. Note: The allowed field names for the metadata document attributes must obey certain rules (such as: keys cannot start with a dollar sign and cannot be empty). See `Naming Conventions <https://docs.datastax.com/en/astra-db-serverless/api-reference/dataapiclient.html#naming-conventions>`_ for details. Returns: The list of ids of the added texts. """ifkwargs:warnings.warn("Method 'add_texts' of AstraDBVectorStore vector store invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)self.astra_env.ensure_db_setup()embedding_vectors:Sequence[list[float]|None]ifself.document_codec.server_side_embeddings:embedding_vectors=[Nonefor_inlist(texts)]else:embedding_vectors=self._get_safe_embedding().embed_documents(list(texts))documents_to_insert=self._get_documents_to_insert(texts,embedding_vectors,metadatas,ids)# perform an AstraPy insert_many, catching exceptions for overwriting docsids_to_replace:list[str]inserted_ids:list[str]=[]try:insert_many_result=self.astra_env.collection.insert_many(documents_to_insert,ordered=False,concurrency=batch_concurrencyorself.bulk_insert_batch_concurrency,chunk_size=batch_sizeorself.batch_size,)ids_to_replace=[]inserted_ids=insert_many_result.inserted_idsexceptCollectionInsertManyExceptionaserr:# check that the error is solely due to already-existing documentsifany(notisinstance(in_err,DataAPIResponseException)forin_errinerr.exceptions):full_err_message=_insertmany_error_message(err)raiseAstraDBVectorStoreError(full_err_message)fromerr# here, assume all in err.exceptions is a DataAPIResponseException:error_codes={err_desc.error_codeforin_errincast(list[DataAPIResponseException],err.exceptions)forerr_descinin_err.error_descriptors}iferror_codes=={DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE}:inserted_ids=err.inserted_idsinserted_ids_set=set(inserted_ids)ids_to_replace=[doc_idfordocumentindocuments_to_insertif(doc_id:=self.document_codec.get_id(document))notininserted_ids_set]else:full_err_message=_insertmany_error_message(err)raiseAstraDBVectorStoreError(full_err_message)fromerr# if necessary, replace docs for the non-inserted idsifids_to_replace:documents_to_replace=[documentfordocumentindocuments_to_insertifself.document_codec.get_id(document)inids_to_replace]_max_workers=(overwrite_concurrencyorself.bulk_insert_overwrite_concurrency)withThreadPoolExecutor(max_workers=_max_workers,)asexecutor:def_replace_document(document:DocDict,)->tuple[CollectionUpdateResult,str]:doc_id=self.document_codec.get_id(document)returnself.astra_env.collection.replace_one(self.document_codec.encode_query(ids=[doc_id]),document,),doc_idreplace_results=list(executor.map(_replace_document,documents_to_replace,))replaced_count=sum(r_res.update_info["n"]forr_res,_inreplace_results)inserted_ids+=[replaced_idfor_,replaced_idinreplace_results]ifreplaced_count!=len(ids_to_replace):missing=len(ids_to_replace)-replaced_countmsg=("AstraDBVectorStore.add_texts could not insert all requested "f"documents ({missing} failed replace_one calls)")raiseAstraDBVectorStoreError(msg)returninserted_ids
[docs]@overrideasyncdefaadd_texts(self,texts:Iterable[str],metadatas:list[dict]|None=None,ids:list[str]|None=None,*,batch_size:int|None=None,batch_concurrency:int|None=None,overwrite_concurrency:int|None=None,**kwargs:Any,)->list[str]:"""Run texts through the embeddings and add them to the vectorstore. If passing explicit ids, those entries whose id is in the store already will be replaced. Args: texts: Texts to add to the vectorstore. metadatas: Optional list of metadatas. ids: Optional list of ids. batch_size: Size of document chunks for each individual insertion API request. If not provided, defaults to the vector-store overall defaults (which in turn falls to astrapy defaults). batch_concurrency: number of simultaneous coroutines to process insertion batches concurrently. Defaults to the vector-store overall setting if not provided. overwrite_concurrency: number of simultaneous coroutines to process pre-existing documents in each batch. Defaults to the vector-store overall setting if not provided. **kwargs: Additional arguments are ignored. Note: The allowed field names for the metadata document attributes must obey certain rules (such as: keys cannot start with a dollar sign and cannot be empty). See `Naming Conventions <https://docs.datastax.com/en/astra-db-serverless/api-reference/dataapiclient.html#naming-conventions>`_ for details. Returns: The list of ids of the added texts. """ifkwargs:warnings.warn("Method 'aadd_texts' of AstraDBVectorStore invoked with "f"unsupported arguments ({', '.join(sorted(kwargs.keys()))}), ""which will be ignored.",stacklevel=2,)awaitself.astra_env.aensure_db_setup()embedding_vectors:Sequence[list[float]|None]ifself.document_codec.server_side_embeddings:embedding_vectors=[Nonefor_inlist(texts)]else:embedding_vectors=awaitself._get_safe_embedding().aembed_documents(list(texts))documents_to_insert=self._get_documents_to_insert(texts,embedding_vectors,metadatas,ids)# perform an AstraPy insert_many, catching exceptions for overwriting docsids_to_replace:list[str]inserted_ids:list[str]=[]try:insert_many_result=awaitself.astra_env.async_collection.insert_many(documents_to_insert,ordered=False,concurrency=batch_concurrencyorself.bulk_insert_batch_concurrency,chunk_size=batch_sizeorself.batch_size,)ids_to_replace=[]inserted_ids=insert_many_result.inserted_idsexceptCollectionInsertManyExceptionaserr:# check that the error is solely due to already-existing documentsifany(notisinstance(in_err,DataAPIResponseException)forin_errinerr.exceptions):full_err_message=_insertmany_error_message(err)raiseAstraDBVectorStoreError(full_err_message)fromerr# here, assume all in err.exceptions is a DataAPIResponseException:error_codes={err_desc.error_codeforin_errincast(list[DataAPIResponseException],err.exceptions)forerr_descinin_err.error_descriptors}iferror_codes=={DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE}:inserted_ids=err.inserted_idsinserted_ids_set=set(inserted_ids)ids_to_replace=[doc_idfordocumentindocuments_to_insertif(doc_id:=self.document_codec.get_id(document))notininserted_ids_set]else:full_err_message=_insertmany_error_message(err)raiseAstraDBVectorStoreError(full_err_message)fromerr# if necessary, replace docs for the non-inserted idsifids_to_replace:documents_to_replace=[documentfordocumentindocuments_to_insertifself.document_codec.get_id(document)inids_to_replace]sem=asyncio.Semaphore(overwrite_concurrencyorself.bulk_insert_overwrite_concurrency,)_async_collection=self.astra_env.async_collectionasyncdef_replace_document(document:DocDict,)->tuple[CollectionUpdateResult,str]:asyncwithsem:doc_id=self.document_codec.get_id(document)returnawait_async_collection.replace_one(self.document_codec.encode_query(ids=[doc_id]),document,),doc_idtasks=[asyncio.create_task(_replace_document(document))fordocumentindocuments_to_replace]replace_results=awaitasyncio.gather(*tasks,return_exceptions=False)replaced_count=sum(r_res.update_info["n"]forr_res,_inreplace_results)inserted_ids+=[replaced_idfor_,replaced_idinreplace_results]ifreplaced_count!=len(ids_to_replace):missing=len(ids_to_replace)-replaced_countmsg=("AstraDBVectorStore.add_texts could not insert all requested "f"documents ({missing} failed replace_one calls)")raiseAstraDBVectorStoreError(msg)returninserted_ids
[docs]defupdate_metadata(self,id_to_metadata:dict[str,dict],*,overwrite_concurrency:int|None=None,)->int:"""Add/overwrite the metadata of existing documents. For each document to update, the new metadata dictionary is appended to the existing metadata, overwriting individual keys that existed already. Args: id_to_metadata: map from the Document IDs to modify to the new metadata for updating. Keys in this dictionary that do not correspond to an existing document will be silently ignored. The values of this map are metadata dictionaries for updating the documents. Any pre-existing metadata will be merged with these entries, which take precedence on a key-by-key basis. overwrite_concurrency: number of threads to process the updates. Defaults to the vector-store overall setting if not provided. Returns: the number of documents successfully updated (i.e. found to exist, since even an update with `{}` as the new metadata counts as successful.) """self.astra_env.ensure_db_setup()_max_workers=overwrite_concurrencyorself.bulk_insert_overwrite_concurrencywithThreadPoolExecutor(max_workers=_max_workers,)asexecutor:def_update_document(id_md_pair:tuple[str,dict],)->CollectionUpdateResult:document_id,update_metadata=id_md_pairencoded_metadata=self.filter_to_query(update_metadata)returnself.astra_env.collection.update_one(self.document_codec.encode_query(ids=[document_id]),{"$set":encoded_metadata},)update_results=list(executor.map(_update_document,id_to_metadata.items(),))returnsum(u_res.update_info["n"]foru_resinupdate_results)
[docs]asyncdefaupdate_metadata(self,id_to_metadata:dict[str,dict],*,overwrite_concurrency:int|None=None,)->int:"""Add/overwrite the metadata of existing documents. For each document to update, the new metadata dictionary is appended to the existing metadata, overwriting individual keys that existed already. Args: id_to_metadata: map from the Document IDs to modify to the new metadata for updating. Keys in this dictionary that do not correspond to an existing document will be silently ignored. The values of this map are metadata dictionaries for updating the documents. Any pre-existing metadata will be merged with these entries, which take precedence on a key-by-key basis. overwrite_concurrency: number of asynchronous tasks to process the updates. Defaults to the vector-store overall setting if not provided. Returns: the number of documents successfully updated (i.e. found to exist, since even an update with `{}` as the new metadata counts as successful.) """awaitself.astra_env.aensure_db_setup()sem=asyncio.Semaphore(overwrite_concurrencyorself.bulk_insert_overwrite_concurrency,)_async_collection=self.astra_env.async_collectionasyncdef_update_document(id_md_pair:tuple[str,dict],)->CollectionUpdateResult:document_id,update_metadata=id_md_pairencoded_metadata=self.filter_to_query(update_metadata)asyncwithsem:returnawait_async_collection.update_one(self.document_codec.encode_query(ids=[document_id]),{"$set":encoded_metadata},)tasks=[asyncio.create_task(_update_document(id_md_pair))forid_md_pairinid_to_metadata.items()]update_results=awaitasyncio.gather(*tasks,return_exceptions=False)returnsum(u_res.update_info["n"]foru_resinupdate_results)
[docs]deffull_decode_astra_db_found_document(self,astra_db_document:DocDict,)->AstraDBQueryResult|None:"""Decode an Astra DB document in full, i.e. into Document+embedding/similarity. This operation returns a representation that is independent of the codec being used in the collection (whereas the input, a 'raw' Astra DB document, is codec-dependent). The input raw document can carry information on embedding and similarity, depending on details of the query used to retrieve it. These can be set to None in the resulf if not found. The whole method can return a None, to signal that the codec has refused the conversion (e.g. because the input document is deemed faulty). Args: astra_db_document: a dictionary obtained through `run_query_raw` from the collection. Returns: a AstraDBQueryResult named tuple with Document, id, embedding (where applicable) and similarity (where applicable), or an overall None if the decoding is refused by the codec. """decoded=self.document_codec.decode(astra_db_document)ifdecodedisnotNone:doc_id=self.document_codec.get_id(astra_db_document)doc_embedding=self.document_codec.decode_vector(astra_db_document)doc_similarity=self.document_codec.get_similarity(astra_db_document)returnAstraDBQueryResult(document=decoded,id=doc_id,embedding=doc_embedding,similarity=doc_similarity,)returnNone
[docs]deffull_decode_astra_db_reranked_result(self,astra_db_reranked_result:RerankedResult[DocDict],)->AstraDBQueryResult|None:"""Full-decode an Astra DB find-and-rerank hit (Document+embedding/similarity). This operation returns a representation that is independent of the codec being used in the collection (whereas the 'document' part of the input, a 'raw' Astra DB response from a find-and-rerank hybrid search, is codec-dependent). The input raw document is what the find_and_rerank Astrapy method returns, i.e. an iterable over RerankedResult objects. Missing entries (such as the embedding) are set to None in the resulf if not found. The whole method can return a None, to signal that the codec has refused the conversion (e.g. because the input document is deemed faulty). Args: astra_db_reranked_result: a RerankedResult obtained by a `find_and_rerank` method call on the collection. Returns: a AstraDBQueryResult named tuple with Document, id, embedding (where applicable) and similarity (where applicable), or an overall None if the decoding is refused by the codec. """astra_db_document=astra_db_reranked_result.documentastra_db_scores=astra_db_reranked_result.scoresdecoded=self.document_codec.decode(astra_db_document)ifdecodedisnotNone:doc_id=self.document_codec.get_id(astra_db_document)doc_embedding=self.document_codec.decode_vector(astra_db_document)doc_similarity=astra_db_scores.get(RERANK_SCORE_KEY)returnAstraDBQueryResult(document=decoded,id=doc_id,embedding=doc_embedding,similarity=doc_similarity,)returnNone
[docs]defrun_query_raw(self,*,n:int,ids:list[str]|None=None,filter:dict[str,Any]|None=None,# noqa: A002sort:dict[str,Any]|None=None,include_similarity:bool|None=None,include_sort_vector:bool=False,include_embeddings:bool=False,)->tuple[list[float]|None,Iterable[DocDict]]|Iterable[DocDict]:"""Execute a generic query on stored documents, returning Astra DB documents. The return value has a variable format, depending on whether the 'sort vector' is requested back from the server. Only the `n` parameter is required. Omitting all other parameters results in a query that matches each and every document found on the collection. The method does not expose a projection directly, which is instead automatically determined based on the invocation options. The returned documents are exactly as they come back from Astra DB (taking into account the projection as well). A further step, namely subsequent invocation of the `convert_astra_db_document` method, is required to reconstruct codec-independent Document objects. The reason for keeping the retrieval and the decoding steps separate is that a caller may want to first deduplicate/discard items, in order to convert only the items actually needed. Args: n: amount of items to return. Fewer items than `n` may be returned if the collection has not enough matches. ids: a list of document IDs to restrict the query to. If this is supplied, only document with an ID among the provided one will match. If further query filters are provided (i.e. metadata), matches must satisfy both requirements. filter: a metadata filtering part. If provided, it must refer to metadata keys by their bare name (such as `{"key": 123}`). This filter can combine nested conditions with "$or"/"$and" connectors, for example: - `{"tag": "a"}` - `{"$or": [{"tag": "a"}, "label": "b"]}` - `{"$and": [{"tag": {"$in": ["a", "z"]}}, "label": "b"]}` sort: a 'sort' clause for the query, such as `{"$vector": [...]}`, `{"$vectorize": "..."}` or `{"mdkey": 1}`. Metadata sort conditions must be expressed by their 'bare' name. include_similarity: whether to return similarity scores with each match. Requires vector sort. include_sort_vector: whether to return the very query vector used for the ANN search alongside the iterable of results. Requires vector sort. Note that the shape of the return value depends on this parameter. include_embeddings: whether to retrieve the matches' own embedding vectors. Returns: The shape of the return value depends on the value of `include_sort_vector`: * if `include_sort_vector = False`, the return value is an iterable over Astra DB documents (dictionaries); * if `include_sort_vector = True`, the return value is a 2-item tuple `(sort_v, astra_db_ite)` tuple, where: - `sort_v` is the sort vector, if requested, or None if not available. - `astra_db_ite` is an iterable over Astra DB documents (dictionaries). """self.astra_env.ensure_db_setup()find_query=self.document_codec.encode_query(ids=ids,filter_dict=filter,)find_sort=self.document_codec.encode_filter(sortor{})find_projection=(self.document_codec.full_projectionifinclude_embeddingselseself.document_codec.base_projection)find_raw_iterator=self.astra_env.collection.find(filter=find_query,projection=find_projection,limit=n,include_similarity=include_similarity,include_sort_vector=include_sort_vector,sort=find_sort,)# stripping down the Astra DB cursor details into a plain iterator:final_doc_iterator=(docfordocinfind_raw_iterator)ifinclude_sort_vector:# the codec option in the AstraDBEnv class disables DataAPIVectors here:sort_vector=cast(Union[list[float],None],(find_raw_iterator.get_sort_vector()ifinclude_sort_vectorelseNone),)returnsort_vector,final_doc_iteratorreturnfinal_doc_iterator
[docs]defrun_query(self,*,n:int,ids:list[str]|None=None,filter:dict[str,Any]|None=None,# noqa: A002sort:dict[str,Any]|None=None,include_similarity:bool|None=None,include_sort_vector:bool=False,include_embeddings:bool=False,)->(tuple[list[float]|None,Iterable[AstraDBQueryResult]]|Iterable[AstraDBQueryResult]):"""Execute a generic query on stored documents, returning Documents+other info. The return value has a variable format, depending on whether the 'sort vector' is requested back from the server. Only the `n` parameter is required. Omitting all other parameters results in a query that matches each and every document found on the collection. The method does not expose a projection directly, which is instead automatically determined based on the invocation options. The returned Document objects are codec-independent. Args: n: amount of items to return. Fewer items than `n` may be returned in the following cases: (a) the decoding skips some raw entries from the server; (b) the collection has not enough matches. ids: a list of document IDs to restrict the query to. If this is supplied, only document with an ID among the provided one will match. If further query filters are provided (i.e. metadata), matches must satisfy both requirements. filter: a metadata filtering part. If provided, it must refer to metadata keys by their bare name (such as `{"key": 123}`). This filter can combine nested conditions with "$or"/"$and" connectors, for example: - `{"tag": "a"}` - `{"$or": [{"tag": "a"}, "label": "b"]}` - `{"$and": [{"tag": {"$in": ["a", "z"]}}, "label": "b"]}` sort: a 'sort' clause for the query, such as `{"$vector": [...]}`, `{"$vectorize": "..."}` or `{"mdkey": 1}`. Metadata sort conditions must be expressed by their 'bare' name. include_similarity: whether to return similarity scores with each match. Requires vector sort. include_sort_vector: whether to return the very query vector used for the ANN search alongside the iterable of results. Requires vector sort. Note that the shape of the return value depends on this parameter. include_embeddings: whether to retrieve the matches' own embedding vectors. Returns: The shape of the return value depends on the value of `include_sort_vector`: * if `include_sort_vector = False`, the return value is an iterable over the AstraDBQueryResult items returned by the query. Entries that fail the decoding step, if any, are discarded after the query, which may lead to fewer items being returned than the required `n`. * if `include_sort_vector = True`, the return value is a 2-item tuple `(sort_v, results_ite)` tuple, where: - `sort_v` is the sort vector, if requested, or None if not available. - `results_ite` is an iterable over AstraDBQueryResult items as above. """ifinclude_sort_vector:query_v,astra_docs_ite=self.run_query_raw(n=n,ids=ids,filter=filter,sort=sort,include_similarity=include_similarity,include_sort_vector=True,include_embeddings=include_embeddings,)return(query_v,(decoded_tupleforastra_db_docinastra_docs_iteif(decoded_tuple:=self.full_decode_astra_db_found_document(astra_db_doc,))isnotNone),)astra_docs_ite=self.run_query_raw(n=n,ids=ids,filter=filter,sort=sort,include_similarity=include_similarity,include_sort_vector=False,include_embeddings=include_embeddings,)return(decoded_tupleforastra_db_docinastra_docs_iteif(decoded_tuple:=self.full_decode_astra_db_found_document(astra_db_doc,))isnotNone)
[docs]asyncdefarun_query_raw(self,*,n:int,ids:list[str]|None=None,filter:dict[str,Any]|None=None,# noqa: A002sort:dict[str,Any]|None=None,include_similarity:bool|None=None,include_sort_vector:bool=False,include_embeddings:bool=False,)->tuple[list[float]|None,AsyncIterable[DocDict]]|AsyncIterable[DocDict]:"""Execute a generic query on stored documents, returning Astra DB documents. The return value has a variable format, depending on whether the 'sort vector' is requested back from the server. Only the `n` parameter is required. Omitting all other parameters results in a query that matches each and every document found on the collection. The method does not expose a projection directly, which is instead automatically determined based on the invocation options. The returned documents are exactly as they come back from Astra DB (taking into account the projection as well). A further step, namely subsequent invocation of the `convert_astra_db_document` method, is required to reconstruct codec-independent Document objects. The reason for keeping the retrieval and the decoding steps separate is that a caller may want to first deduplicate/discard items, in order to convert only the items actually needed. Args: n: amount of items to return. Fewer items than `n` may be returned in the following cases: (a) the decoding skips some raw entries from the server; (b) the collection has not enough matches. ids: a list of document IDs to restrict the query to. If this is supplied, only document with an ID among the provided one will match. If further query filters are provided (i.e. metadata), matches must satisfy both requirements. filter: a metadata filtering part. If provided, it must refer to metadata keys by their bare name (such as `{"key": 123}`). This filter can combine nested conditions with "$or"/"$and" connectors, for example: - `{"tag": "a"}` - `{"$or": [{"tag": "a"}, "label": "b"]}` - `{"$and": [{"tag": {"$in": ["a", "z"]}}, "label": "b"]}` sort: a 'sort' clause for the query, such as `{"$vector": [...]}`, `{"$vectorize": "..."}` or `{"mdkey": 1}`. Metadata sort conditions must be expressed by their 'bare' name. include_similarity: whether to return similarity scores with each match. Requires vector sort. include_sort_vector: whether to return the very query vector used for the ANN search alongside the iterable of results. Requires vector sort. Note that the shape of the return value depends on this parameter. include_embeddings: whether to retrieve the matches' own embedding vectors. Returns: The shape of the return value depends on the value of `include_sort_vector`: * if `include_sort_vector = False`, the return value is an iterable over Astra DB documents (dictionaries); * if `include_sort_vector = True`, the return value is a 2-item tuple `(sort_v, astra_db_ite)` tuple, where: - `sort_v` is the sort vector, if requested, or None if not available. - `astra_db_ite` is an iterable over Astra DB documents (dictionaries). """awaitself.astra_env.aensure_db_setup()find_query=self.document_codec.encode_query(ids=ids,filter_dict=filter,)find_sort=self.document_codec.encode_filter(sortor{})find_projection=(self.document_codec.full_projectionifinclude_embeddingselseself.document_codec.base_projection)find_raw_iterator=self.astra_env.async_collection.find(filter=find_query,projection=find_projection,limit=n,include_similarity=include_similarity,include_sort_vector=include_sort_vector,sort=find_sort,)# stripping down the Astra DB cursor details into a plain iterator:final_doc_iterator=(docasyncfordocinfind_raw_iterator)ifinclude_sort_vector:# the codec option in the AstraDBEnv class disables DataAPIVectors here:sort_vector=cast(Union[list[float],None],(awaitfind_raw_iterator.get_sort_vector()ifinclude_sort_vectorelseNone),)returnsort_vector,final_doc_iteratorreturnfinal_doc_iterator
[docs]asyncdefarun_query(self,*,n:int,ids:list[str]|None=None,filter:dict[str,Any]|None=None,# noqa: A002sort:dict[str,Any]|None=None,include_similarity:bool|None=None,include_sort_vector:bool=False,include_embeddings:bool=False,)->(tuple[list[float]|None,AsyncIterable[AstraDBQueryResult]]|AsyncIterable[AstraDBQueryResult]):"""Execute a generic query on stored documents, returning Documents+other info. The return value has a variable format, depending on whether the 'sort vector' is requested back from the server. Only the `n` parameter is required. Omitting all other parameters results in a query that matches each and every document found on the collection. The method does not expose a projection directly, which is instead automatically determined based on the invocation options. The returned Document objects are codec-independent. Args: n: amount of items to return. Fewer items than `n` may be returned if the collection has not enough matches. ids: a list of document IDs to restrict the query to. If this is supplied, only document with an ID among the provided one will match. If further query filters are provided (i.e. metadata), matches must satisfy both requirements. filter: a metadata filtering part. If provided, it must refer to metadata keys by their bare name (such as `{"key": 123}`). This filter can combine nested conditions with "$or"/"$and" connectors, for example: - `{"tag": "a"}` - `{"$or": [{"tag": "a"}, "label": "b"]}` - `{"$and": [{"tag": {"$in": ["a", "z"]}}, "label": "b"]}` sort: a 'sort' clause for the query, such as `{"$vector": [...]}`, `{"$vectorize": "..."}` or `{"mdkey": 1}`. Metadata sort conditions must be expressed by their 'bare' name. include_similarity: whether to return similarity scores with each match. Requires vector sort. include_sort_vector: whether to return the very query vector used for the ANN search alongside the iterable of results. Requires vector sort. Note that the shape of the return value depends on this parameter. include_embeddings: whether to retrieve the matches' own embedding vectors. Returns: The shape of the return value depends on the value of `include_sort_vector`: * if `include_sort_vector = False`, the return value is an iterable over the AstraDBQueryResult items returned by the query. Entries that fail the decoding step, if any, are discarded after the query, which may lead to fewer items being returned than the required `n`. * if `include_sort_vector = True`, the return value is a 2-item tuple `(sort_v, results_ite)` tuple, where: - `sort_v` is the sort vector, if requested, or None if not available. - `results_ite` is an iterable over AstraDBQueryResult items as above. """ifinclude_sort_vector:query_v,astra_docs_ite=awaitself.arun_query_raw(n=n,ids=ids,filter=filter,sort=sort,include_similarity=include_similarity,include_sort_vector=True,include_embeddings=include_embeddings,)return(query_v,(decoded_tupleasyncforastra_db_docinastra_docs_iteif(decoded_tuple:=self.full_decode_astra_db_found_document(astra_db_doc,))isnotNone),)astra_docs_ite=awaitself.arun_query_raw(n=n,ids=ids,filter=filter,sort=sort,include_similarity=include_similarity,include_sort_vector=False,include_embeddings=include_embeddings,)return(decoded_tupleasyncforastra_db_docinastra_docs_iteif(decoded_tuple:=self.full_decode_astra_db_found_document(astra_db_doc,))isnotNone)
[docs]defmetadata_search(self,filter:dict[str,Any]|None=None,# noqa: A002n:int=5,)->list[Document]:"""Get documents via a metadata search. Args: filter: the metadata to query for. n: the maximum number of documents to return. """docs_ite=self.run_query(n=n,filter=filter)return[docfordoc,_,_,_indocs_ite]
[docs]asyncdefametadata_search(self,filter:dict[str,Any]|None=None,# noqa: A002n:int=5,)->Iterable[Document]:"""Get documents via a metadata search. Args: filter: the metadata to query for. n: the maximum number of documents to return. """docs_ite=awaitself.arun_query(n=n,filter=filter)return[docasyncfordoc,_,_,_indocs_ite]
[docs]defget_by_document_id(self,document_id:str)->Document|None:"""Retrieve a single document from the store, given its document ID. Args: document_id: The document ID Returns: The the document if it exists. Otherwise None. """hits_ite=self.run_query(n=1,ids=[document_id],)hits=[docfordoc,_,_,_inhits_ite]ifhits:returnhits[0]returnNone
[docs]asyncdefaget_by_document_id(self,document_id:str)->Document|None:"""Retrieve a single document from the store, given its document ID. Args: document_id: The document ID Returns: The the document if it exists. Otherwise None. """hits_ite=awaitself.arun_query(n=1,ids=[document_id],)hits=[docasyncfordoc,_,_,_inhits_ite]ifhits:returnhits[0]returnNone
[docs]@overridedefsimilarity_search(self,query:str,k:int=4,filter:dict[str,Any]|None=None,lexical_query:str|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to query. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. lexical_query: for hybrid search, a specific query for the lexical portion of the retrieval. If omitted or empty, defaults to the same as 'query'. If passed on a non-hybrid search, an error is raised. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query. """return[docfor(doc,_,_)inself.similarity_search_with_score_id(query=query,k=k,filter=filter,lexical_query=lexical_query,)]
[docs]@overridedefsimilarity_search_with_score(self,query:str,k:int=4,filter:dict[str,Any]|None=None,lexical_query:str|None=None,)->list[tuple[Document,float]]:"""Return docs most similar to query with score. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. lexical_query: for hybrid search, a specific query for the lexical portion of the retrieval. If omitted or empty, defaults to the same as 'query'. If passed on a non-hybrid search, an error is raised. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,score)for(doc,score,_)inself.similarity_search_with_score_id(query=query,k=k,filter=filter,lexical_query=lexical_query,)]
[docs]defsimilarity_search_with_score_id(self,query:str,k:int=4,filter:dict[str,Any]|None=None,# noqa: A002lexical_query:str|None=None,)->list[tuple[Document,float,str]]:"""Return docs most similar to the query with score and id. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. lexical_query: for hybrid search, a specific query for the lexical portion of the retrieval. If omitted or empty, defaults to the same as 'query'. If passed on a non-hybrid search, an error is raised. Returns: The list of (Document, score, id), the most similar to the query. """sort:dict[str,Any]ifself.hybrid_search:rerank_on=self.document_codec.rerank_onrerank_query:str|Noneifself.document_codec.server_side_embeddings:sort=self.document_codec.encode_hybrid_sort(vector=None,vectorize=query,lexical=lexical_queryorquery,)rerank_query=Noneelse:embedding_vector=self._get_safe_embedding().embed_query(query)sort=self.document_codec.encode_hybrid_sort(vector=embedding_vector,vectorize=None,lexical=lexical_queryorquery,)rerank_query=queryreturnself._hybrid_search_with_score_id_by_sort(sort=sort,k=k,filter_dict=filter,rerank_on=rerank_on,rerank_query=rerank_query,)iflexical_queryisnotNone:raiseValueError(ERROR_LEXICAL_QUERY_ON_NONHYBRID_SEARCH)ifself.document_codec.server_side_embeddings:sort=self.document_codec.encode_vectorize_sort(query)else:embedding_vector=self._get_safe_embedding().embed_query(query)sort=self.document_codec.encode_vector_sort(embedding_vector)returnself._similarity_find_with_score_id_by_sort(sort=sort,k=k,filter_dict=filter,)
[docs]@overridedefsimilarity_search_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query vector. """return[docfor(doc,_,_)inself.similarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]defsimilarity_search_with_score_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float]]:"""Return docs most similar to embedding vector with score. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,score)for(doc,score,_)inself.similarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]defsimilarity_search_with_score_id_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Return docs most similar to embedding vector with score and id. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score, id), the most similar to the query vector. """ifself.document_codec.server_side_embeddings:msg=("Searching by vector on a Vector Store that uses server-side ""embeddings is not allowed.")raiseValueError(msg)sort=self.document_codec.encode_vector_sort(embedding)returnself._similarity_find_with_score_id_by_sort(sort=sort,k=k,filter_dict=filter,)
def_similarity_find_with_score_id_by_sort(self,sort:dict[str,Any],k:int,filter_dict:dict[str,Any]|None,)->list[tuple[Document,float,str]]:"""Run ANN search with a provided sort clause."""hits_ite=self.run_query(n=k,filter=filter_dict,sort=sort,include_similarity=True,)# doc is a Document and sim is a float:return[cast(tuple[Document,float,str],(doc,sim,did))fordoc,did,_,siminhits_ite]def_hybrid_search_with_score_id_by_sort(self,sort:dict[str,Any],k:int,filter_dict:dict[str,Any]|None,rerank_on:str|None,rerank_query:str|None,)->list[tuple[Document,float,str]]:"""Run a hybrid search with a provided sort clause."""self.astra_env.ensure_db_setup()encoded_filter=self.document_codec.encode_query(filter_dict=filter_dict)hybrid_limits=_make_hybrid_limits(self.hybrid_limit_factor,k)hybrid_reranked_results=self.astra_env.collection.find_and_rerank(filter=encoded_filter,sort=sort,projection=self.document_codec.base_projection,limit=k,hybrid_limits=hybrid_limits,include_scores=True,rerank_on=rerank_on,rerank_query=rerank_query,)return[cast(tuple[Document,float,str],(decoded_tuple.document,decoded_tuple.similarity,decoded_tuple.id,),)forrrk_resultinhybrid_reranked_resultsif(decoded_tuple:=self.full_decode_astra_db_reranked_result(rrk_result,))isnotNone]
[docs]@overrideasyncdefasimilarity_search(self,query:str,k:int=4,filter:dict[str,Any]|None=None,lexical_query:str|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to query. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. lexical_query: for hybrid search, a specific query for the lexical portion of the retrieval. If omitted or empty, defaults to the same as 'query'. If passed on a non-hybrid search, an error is raised. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query. """return[docfor(doc,_,_)inawaitself.asimilarity_search_with_score_id(query=query,k=k,filter=filter,lexical_query=lexical_query,)]
[docs]@overrideasyncdefasimilarity_search_with_score(self,query:str,k:int=4,filter:dict[str,Any]|None=None,lexical_query:str|None=None,)->list[tuple[Document,float]]:"""Return docs most similar to query with score. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. lexical_query: for hybrid search, a specific query for the lexical portion of the retrieval. If omitted or empty, defaults to the same as 'query'. If passed on a non-hybrid search, an error is raised. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,score)for(doc,score,_)inawaitself.asimilarity_search_with_score_id(query=query,k=k,filter=filter,lexical_query=lexical_query,)]
[docs]asyncdefasimilarity_search_with_score_id(self,query:str,k:int=4,filter:dict[str,Any]|None=None,# noqa: A002lexical_query:str|None=None,)->list[tuple[Document,float,str]]:"""Return docs most similar to the query with score and id. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. lexical_query: for hybrid search, a specific query for the lexical portion of the retrieval. If omitted or empty, defaults to the same as 'query'. If passed on a non-hybrid search, an error is raised. Returns: The list of (Document, score, id), the most similar to the query. """sort:dict[str,Any]ifself.hybrid_search:rerank_on=self.document_codec.rerank_onrerank_query:str|Noneifself.document_codec.server_side_embeddings:sort=self.document_codec.encode_hybrid_sort(vector=None,vectorize=query,lexical=lexical_queryorquery,)rerank_query=Noneelse:embedding_vector=awaitself._get_safe_embedding().aembed_query(query)sort=self.document_codec.encode_hybrid_sort(vector=embedding_vector,vectorize=None,lexical=lexical_queryorquery,)rerank_query=queryreturnawaitself._ahybrid_search_with_score_id_by_sort(sort=sort,k=k,filter_dict=filter,rerank_on=rerank_on,rerank_query=rerank_query,)iflexical_queryisnotNone:raiseValueError(ERROR_LEXICAL_QUERY_ON_NONHYBRID_SEARCH)ifself.document_codec.server_side_embeddings:sort=self.document_codec.encode_vectorize_sort(query)else:embedding_vector=awaitself._get_safe_embedding().aembed_query(query)sort=self.document_codec.encode_vector_sort(embedding_vector)returnawaitself._asimilarity_find_with_score_id_by_sort(sort=sort,k=k,filter_dict=filter,)
[docs]@overrideasyncdefasimilarity_search_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query vector. """return[docfor(doc,_,_)inawaitself.asimilarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]asyncdefasimilarity_search_with_score_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float]]:"""Return docs most similar to embedding vector with score. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score), the most similar to the query vector. """return[(doc,scr)for(doc,scr,_)inawaitself.asimilarity_search_with_score_id_by_vector(embedding=embedding,k=k,filter=filter,)]
[docs]asyncdefasimilarity_search_with_score_id_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,float,str]]:"""Return docs most similar to embedding vector with score and id. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: The list of (Document, score, id), the most similar to the query vector. """ifself.document_codec.server_side_embeddings:msg=("Searching by vector on a Vector Store that uses server-side ""embeddings is not allowed.")raiseValueError(msg)sort=self.document_codec.encode_vector_sort(embedding)returnawaitself._asimilarity_find_with_score_id_by_sort(sort=sort,k=k,filter_dict=filter,)
asyncdef_asimilarity_find_with_score_id_by_sort(self,sort:dict[str,Any],k:int,filter_dict:dict[str,Any]|None,)->list[tuple[Document,float,str]]:"""Run ANN search with a provided sort clause."""hits_ite=awaitself.arun_query(n=k,filter=filter_dict,sort=sort,include_similarity=True,)# doc is a Document and sim is a float:return[cast(tuple[Document,float,str],(doc,sim,did))asyncfordoc,did,_,siminhits_ite]asyncdef_ahybrid_search_with_score_id_by_sort(self,sort:dict[str,Any],k:int,filter_dict:dict[str,Any]|None,rerank_on:str|None,rerank_query:str|None,)->list[tuple[Document,float,str]]:"""Run a hybrid search with a provided sort clause."""awaitself.astra_env.aensure_db_setup()encoded_filter=self.document_codec.encode_query(filter_dict=filter_dict)hybrid_limits=_make_hybrid_limits(self.hybrid_limit_factor,k)hybrid_reranked_results=self.astra_env.async_collection.find_and_rerank(filter=encoded_filter,sort=sort,projection=self.document_codec.base_projection,limit=k,hybrid_limits=hybrid_limits,include_scores=True,rerank_on=rerank_on,rerank_query=rerank_query,)return[cast(tuple[Document,float,str],(decoded_tuple.document,decoded_tuple.similarity,decoded_tuple.id,),)asyncforrrk_resultinhybrid_reranked_resultsif(decoded_tuple:=self.full_decode_astra_db_reranked_result(rrk_result,))isnotNone]
[docs]defsimilarity_search_with_embedding_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,list[float]]]:"""Return docs most similar to embedding vector with embedding. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: (The query embedding vector, The list of (Document, embedding), the most similar to the query vector.). """sort=self.document_codec.encode_vector_sort(vector=embedding)_,doc_emb_list=self._similarity_find_with_embedding_by_sort(sort=sort,k=k,filter=filter)returndoc_emb_list
[docs]asyncdefasimilarity_search_with_embedding_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->list[tuple[Document,list[float]]]:"""Return docs most similar to embedding vector with embedding. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: (The query embedding vector, The list of (Document, embedding), the most similar to the query vector.). """sort=self.document_codec.encode_vector_sort(vector=embedding)_,doc_emb_list=awaitself._asimilarity_find_with_embedding_by_sort(sort=sort,k=k,filter=filter)returndoc_emb_list
[docs]defsimilarity_search_with_embedding(self,query:str,k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->tuple[list[float],list[tuple[Document,list[float]]]]:"""Return docs most similar to the query with embedding. Also includes the query embedding vector. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: (The query embedding vector, The list of (Document, embedding), the most similar to the query vector.). """ifself.hybrid_search:warnings.warn(("Method `similarity_search_with_embedding` was called on a vector ""store equipped with Hybrid capabilities. Since this method cannot ""make use of Hybrid search, the vector store will fall back to ""regular vector ANN similarity search."),UserWarning,stacklevel=2,)ifself.document_codec.server_side_embeddings:sort=self.document_codec.encode_vectorize_sort(query)else:query_embedding=self._get_safe_embedding().embed_query(text=query)# shortcut return if query isn't needed.ifk==0:return(query_embedding,[])sort=self.document_codec.encode_vector_sort(vector=query_embedding)returnself._similarity_find_with_embedding_by_sort(sort=sort,k=k,filter=filter)
[docs]asyncdefasimilarity_search_with_embedding(self,query:str,k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->tuple[list[float],list[tuple[Document,list[float]]]]:"""Return docs most similar to the query with embedding. Also includes the query embedding vector. Args: query: Query to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. Returns: (The query embedding vector, The list of (Document, embedding), the most similar to the query vector.). """ifself.hybrid_search:warnings.warn(("Method `asimilarity_search_with_embedding` was called on a vector ""store equipped with Hybrid capabilities. Since this method cannot ""make use of Hybrid search, the vector store will fall back to ""regular vector ANN similarity search."),UserWarning,stacklevel=2,)ifself.document_codec.server_side_embeddings:sort=self.document_codec.encode_vectorize_sort(query)else:query_embedding=self._get_safe_embedding().embed_query(text=query)# shortcut return if query isn't needed.ifk==0:return(query_embedding,[])sort=self.document_codec.encode_vector_sort(vector=query_embedding)returnawaitself._asimilarity_find_with_embedding_by_sort(sort=sort,k=k,filter=filter)
def_similarity_find_with_embedding_by_sort(self,sort:dict[str,Any],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->tuple[list[float],list[tuple[Document,list[float]]]]:"""Run ANN search with a provided sort clause. Returns: (query_embedding, List of (Document, embedding) most similar to the query). """sort_vec,hits_ite=self.run_query(n=k,filter=filter,sort=sort,include_sort_vector=True,include_embeddings=True,)ifsort_vecisNone:msg="Unable to retrieve the server-side embedding of the query."raiseAstraDBVectorStoreError(msg)# doc is a Document and emb is a list[float]:return(sort_vec,[cast(tuple[Document,list[float]],(doc,emb))fordoc,_,emb,_inhits_ite],)asyncdef_asimilarity_find_with_embedding_by_sort(self,sort:dict[str,Any],k:int=4,filter:dict[str,Any]|None=None,# noqa: A002)->tuple[list[float],list[tuple[Document,list[float]]]]:"""Run ANN search with a provided sort clause. Returns: (query_embedding, List of (Document, embedding) most similar to the query). """sort_vec,hits_ite=awaitself.arun_query(n=k,filter=filter,sort=sort,include_sort_vector=True,include_embeddings=True,)ifsort_vecisNone:msg="Unable to retrieve the server-side embedding of the query."raiseAstraDBVectorStoreError(msg)# doc is a Document and emb is a list[float]:return(sort_vec,[cast(tuple[Document,list[float]],(doc,emb))asyncfordoc,_,emb,_inhits_ite],)def_run_mmr_find_by_sort(self,sort:dict[str,Any],k:int,fetch_k:int,lambda_mult:float,filter:dict[str,Any]|None,# noqa: A002)->list[Document]:sort_vec,hits_ite=self.run_query(n=fetch_k,filter=filter,sort=sort,include_sort_vector=True,include_embeddings=True,)# this is list[tuple[Document, list[float]]]:prefetch_hit_pairs=cast(list[tuple[Document,list[float]]],[(doc,emb)fordoc,_,emb,_inhits_ite],)ifsort_vecisNone:msg="Unable to retrieve the server-side embedding of the query."raiseAstraDBVectorStoreError(msg)returnself._get_mmr_hits(embedding=sort_vec,k=k,lambda_mult=lambda_mult,prefetch_hit_pairs=prefetch_hit_pairs,)asyncdef_arun_mmr_find_by_sort(self,sort:dict[str,Any],k:int,fetch_k:int,lambda_mult:float,filter:dict[str,Any]|None,# noqa: A002)->list[Document]:sort_vec,hits_ite=awaitself.arun_query(n=fetch_k,filter=filter,sort=sort,include_sort_vector=True,include_embeddings=True,)# this is list[tuple[Document, list[float]]]:prefetch_hit_pairs=cast(list[tuple[Document,list[float]]],[(doc,emb)asyncfordoc,_,emb,_inhits_ite],)ifsort_vecisNone:msg="Unable to retrieve the server-side embedding of the query."raiseAstraDBVectorStoreError(msg)returnself._get_mmr_hits(embedding=sort_vec,k=k,lambda_mult=lambda_mult,prefetch_hit_pairs=prefetch_hit_pairs,)def_get_mmr_hits(self,embedding:list[float],k:int,lambda_mult:float,prefetch_hit_pairs:list[tuple[Document,list[float]]],)->list[Document]:mmr_chosen_indices=maximal_marginal_relevance(np.array(embedding,dtype=np.float32),[hit_pair[1]forhit_pairinprefetch_hit_pairs],k=k,lambda_mult=lambda_mult,)return[hit_pair[0]forpf_hit_index,hit_pairinenumerate(prefetch_hit_pairs)ifpf_hit_indexinmmr_chosen_indices]
[docs]@overridedefmax_marginal_relevance_search_by_vector(self,embedding:list[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """returnself._run_mmr_find_by_sort(sort=self.document_codec.encode_vector_sort(embedding),k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,)
[docs]@overrideasyncdefamax_marginal_relevance_search_by_vector(self,embedding:list[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """returnawaitself._arun_mmr_find_by_sort(sort=self.document_codec.encode_vector_sort(embedding),k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,)
[docs]@overridedefmax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Query to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """ifself.hybrid_search:warnings.warn(("Method `max_marginal_relevance_search` was called on a vector ""store equipped with Hybrid capabilities. Since this method cannot ""make use of Hybrid search, the vector store will fall back to ""regular vector ANN similarity search."),UserWarning,stacklevel=2,)ifself.document_codec.server_side_embeddings:# this case goes directly to the "_by_sort" method# (and does its own filter normalization, as it cannot# use the path for the with-embedding mmr querying)returnself._run_mmr_find_by_sort(sort=self.document_codec.encode_vectorize_sort(query),k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,)embedding_vector=self._get_safe_embedding().embed_query(query)returnself.max_marginal_relevance_search_by_vector(embedding_vector,k,fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)
[docs]@overrideasyncdefamax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Query to look up documents similar to. k: Number of Documents to return. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents selected by maximal marginal relevance. """ifself.hybrid_search:warnings.warn(("Method `amax_marginal_relevance_search` was called on a vector ""store equipped with Hybrid capabilities. Since this method cannot ""make use of Hybrid search, the vector store will fall back to ""regular vector ANN similarity search."),UserWarning,stacklevel=2,)ifself.document_codec.server_side_embeddings:# this case goes directly to the "_by_sort" method# (and does its own filter normalization, as it cannot# use the path for the with-embedding mmr querying)returnawaitself._arun_mmr_find_by_sort(sort=self.document_codec.encode_vectorize_sort(query),k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,)embedding_vector=awaitself._get_safe_embedding().aembed_query(query)returnawaitself.amax_marginal_relevance_search_by_vector(embedding_vector,k,fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)
@classmethoddef_from_kwargs(cls:type[AstraDBVectorStore],**kwargs:Any,)->AstraDBVectorStore:_args=inspect.getfullargspec(AstraDBVectorStore.__init__).args_kwargs=inspect.getfullargspec(AstraDBVectorStore.__init__).kwonlyargsknown_kwarg_keys=(set(_args)|set(_kwargs))-{"self"}ifkwargs:unknown_kwarg_keys=set(kwargs.keys())-known_kwarg_keysifunknown_kwarg_keys:warnings.warn(("Method 'from_texts/afrom_texts' of AstraDBVectorStore ""vector store invoked with unsupported arguments "f"({', '.join(sorted(unknown_kwarg_keys))}), ""which will be ignored."),UserWarning,stacklevel=3,)known_kwargs={k:vfork,vinkwargs.items()ifkinknown_kwarg_keys}returncls(**known_kwargs)
[docs]@classmethod@overridedeffrom_texts(cls:type[AstraDBVectorStore],texts:list[str],embedding:Embeddings|None=None,metadatas:list[dict]|None=None,ids:list[str]|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from raw texts. Args: texts: the texts to insert. embedding: the embedding function to use in the store. metadatas: metadata dicts for the texts. ids: ids to associate to the texts. **kwargs: you can pass any argument that you would to :meth:`~add_texts` and/or to the ``AstraDBVectorStore`` constructor (see these methods for details). These arguments will be routed to the respective methods as they are. Returns: an ``AstraDBVectorStore`` vectorstore. """_add_texts_inspection=inspect.getfullargspec(AstraDBVectorStore.add_texts)_method_args=(set(_add_texts_inspection.kwonlyargs)|set(_add_texts_inspection.kwonlyargs))-{"self","texts","metadatas","ids"}_init_kwargs={k:vfork,vinkwargs.items()ifknotin_method_args}_method_kwargs={k:vfork,vinkwargs.items()ifkin_method_args}astra_db_store=AstraDBVectorStore._from_kwargs(embedding=embedding,**_init_kwargs,)astra_db_store.add_texts(texts=texts,metadatas=metadatas,ids=ids,**_method_kwargs,)returnastra_db_store
[docs]@override@classmethodasyncdefafrom_texts(cls:type[AstraDBVectorStore],texts:list[str],embedding:Embeddings|None=None,metadatas:list[dict]|None=None,ids:list[str]|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from raw texts. Args: texts: the texts to insert. embedding: embedding function to use. metadatas: metadata dicts for the texts. ids: ids to associate to the texts. **kwargs: you can pass any argument that you would to :meth:`~aadd_texts` and/or to the ``AstraDBVectorStore`` constructor (see these methods for details). These arguments will be routed to the respective methods as they are. Returns: an ``AstraDBVectorStore`` vectorstore. """_aadd_texts_inspection=inspect.getfullargspec(AstraDBVectorStore.aadd_texts)_method_args=(set(_aadd_texts_inspection.kwonlyargs)|set(_aadd_texts_inspection.kwonlyargs))-{"self","texts","metadatas","ids"}_init_kwargs={k:vfork,vinkwargs.items()ifknotin_method_args}_method_kwargs={k:vfork,vinkwargs.items()ifkin_method_args}astra_db_store=AstraDBVectorStore._from_kwargs(embedding=embedding,**_init_kwargs,)awaitastra_db_store.aadd_texts(texts=texts,metadatas=metadatas,ids=ids,**_method_kwargs,)returnastra_db_store
[docs]@classmethod@overridedeffrom_documents(cls:type[AstraDBVectorStore],documents:list[Document],embedding:Embeddings|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from a document list. Utility method that defers to :meth:`from_texts` (see that one). Args: texts: the texts to insert. documents: a list of `Document` objects for insertion in the store. embedding: the embedding function to use in the store. **kwargs: you can pass any argument that you would to :meth:`~add_texts` and/or to the ``AstraDBVectorStore`` constructor (see these methods for details). These arguments will be routed to the respective methods as they are. Returns: an ``AstraDBVectorStore`` vectorstore. """texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]if"ids"inkwargs:warnings.warn(("Parameter `ids` to AstraDBVectorStore's `from_documents` ""method is deprecated. Please set the supplied documents' ""`.id` attribute instead. The id attribute of Document ""is ignored as long as the `ids` parameter is passed."),DeprecationWarning,stacklevel=2,)ids=kwargs.pop("ids")else:_ids=[doc.idfordocindocuments]ids=_idsifany(the_idisnotNoneforthe_idin_ids)elseNonereturncls.from_texts(texts,embedding=embedding,metadatas=metadatas,ids=ids,**kwargs,)
[docs]@classmethodasyncdefafrom_documents(cls:type[AstraDBVectorStore],documents:list[Document],embedding:Embeddings|None=None,**kwargs:Any,)->AstraDBVectorStore:"""Create an Astra DB vectorstore from a document list. Utility method that defers to :meth:`afrom_texts` (see that one). Args: see :meth:`afrom_texts`, except here you have to supply ``documents`` in place of ``texts`` and ``metadatas``. Returns: an ``AstraDBVectorStore`` vectorstore. """texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]if"ids"inkwargs:warnings.warn(("Parameter `ids` to AstraDBVectorStore's `from_documents` ""method is deprecated. Please set the supplied documents' ""`.id` attribute instead. The id attribute of Document ""is ignored as long as the `ids` parameter is passed."),DeprecationWarning,stacklevel=2,)ids=kwargs.pop("ids")else:_ids=[doc.idfordocindocuments]ids=_idsifany(the_idisnotNoneforthe_idin_ids)elseNonereturnawaitcls.afrom_texts(texts,embedding=embedding,metadatas=metadatas,ids=ids,**kwargs,)