"""Astra DB - based storages."""from__future__importannotationsimportasyncioimportbase64fromabcimportabstractmethodfromconcurrent.futuresimportThreadPoolExecutorfromtypingimport(TYPE_CHECKING,Any,AsyncIterator,Generic,Iterator,Sequence,TypeVar,)fromastrapy.exceptionsimportInsertManyExceptionfromlangchain_core.storesimportBaseStore,ByteStorefromtyping_extensionsimportoverridefromlangchain_astradb.utils.astradbimport(COMPONENT_NAME_BYTESTORE,COMPONENT_NAME_STORE,MAX_CONCURRENT_DOCUMENT_INSERTIONS,MAX_CONCURRENT_DOCUMENT_REPLACEMENTS,SetupMode,_AstraDBCollectionEnvironment,)ifTYPE_CHECKING:fromastrapy.authenticationimportTokenProviderfromastrapy.dbimportAstraDB,AsyncAstraDBfromastrapy.resultsimportUpdateResultV=TypeVar("V")
[docs]def__init__(self,*args:Any,**kwargs:Any)->None:"""Base class for the DataStax Astra DB data store."""if"requested_indexing_policy"inkwargs:msg="Do not pass 'requested_indexing_policy' to AstraDBBaseStore init"raiseValueError(msg)if"default_indexing_policy"inkwargs:msg="Do not pass 'default_indexing_policy' to AstraDBBaseStore init"raiseValueError(msg)kwargs["requested_indexing_policy"]={"allow":["_id"]}kwargs["default_indexing_policy"]={"allow":["_id"]}if"namespace"inkwargs:kwargs["keyspace"]=kwargs.pop("namespace")self.astra_env=_AstraDBCollectionEnvironment(*args,**kwargs,)self.collection=self.astra_env.collectionself.async_collection=self.astra_env.async_collection
[docs]@abstractmethoddefdecode_value(self,value:Any)->V|None:# noqa: ANN401"""Decodes value from Astra DB."""
[docs]@abstractmethoddefencode_value(self,value:V|None)->Any:# noqa: ANN401"""Encodes value for Astra DB."""
[docs]@overridedefmset(self,key_value_pairs:Sequence[tuple[str,V]])->None:self.astra_env.ensure_db_setup()documents_to_insert=[{"_id":k,"value":self.encode_value(v)}fork,vinkey_value_pairs]# perform an AstraPy insert_many, catching exceptions for overwriting docsids_to_replace:list[int]try:self.collection.insert_many(documents_to_insert,ordered=False,concurrency=MAX_CONCURRENT_DOCUMENT_INSERTIONS,)ids_to_replace=[]exceptInsertManyExceptionaserr:inserted_ids_set=set(err.partial_result.inserted_ids)ids_to_replace=[document["_id"]fordocumentindocuments_to_insertifdocument["_id"]notininserted_ids_set]# if necessary, replace docs for the non-inserted idsifids_to_replace:documents_to_replace=[documentfordocumentindocuments_to_insertifdocument["_id"]inids_to_replace]withThreadPoolExecutor(max_workers=MAX_CONCURRENT_DOCUMENT_REPLACEMENTS)asexecutor:def_replace_document(document:dict[str,Any])->UpdateResult:returnself.collection.replace_one({"_id":document["_id"]},document,)replace_results=list(executor.map(_replace_document,documents_to_replace,))replaced_count=sum(r_res.update_info["n"]forr_resinreplace_results)ifreplaced_count!=len(ids_to_replace):missing=len(ids_to_replace)-replaced_countmsg=("AstraDBBaseStore.mset could not insert all requested "f"documents ({missing} failed replace_one calls)")raiseValueError(msg)
[docs]@overrideasyncdefamset(self,key_value_pairs:Sequence[tuple[str,V]])->None:awaitself.astra_env.aensure_db_setup()documents_to_insert=[{"_id":k,"value":self.encode_value(v)}fork,vinkey_value_pairs]# perform an AstraPy insert_many, catching exceptions for overwriting docsids_to_replace:list[int]try:awaitself.async_collection.insert_many(documents_to_insert,ordered=False,)ids_to_replace=[]exceptInsertManyExceptionaserr:inserted_ids_set=set(err.partial_result.inserted_ids)ids_to_replace=[document["_id"]fordocumentindocuments_to_insertifdocument["_id"]notininserted_ids_set]# if necessary, replace docs for the non-inserted idsifids_to_replace:documents_to_replace=[documentfordocumentindocuments_to_insertifdocument["_id"]inids_to_replace]sem=asyncio.Semaphore(MAX_CONCURRENT_DOCUMENT_REPLACEMENTS)_async_collection=self.async_collectionasyncdef_replace_document(document:dict[str,Any])->UpdateResult:asyncwithsem:returnawait_async_collection.replace_one({"_id":document["_id"]},document,)tasks=[asyncio.create_task(_replace_document(document))fordocumentindocuments_to_replace]replace_results=awaitasyncio.gather(*tasks,return_exceptions=False)replaced_count=sum(r_res.update_info["n"]forr_resinreplace_results)ifreplaced_count!=len(ids_to_replace):missing=len(ids_to_replace)-replaced_countmsg=("AstraDBBaseStore.mset could not insert all requested "f"documents ({missing} failed replace_one calls)")raiseValueError(msg)
[docs]def__init__(self,collection_name:str,*,token:str|TokenProvider|None=None,api_endpoint:str|None=None,namespace:str|None=None,environment:str|None=None,pre_delete_collection:bool=False,setup_mode:SetupMode=SetupMode.SYNC,ext_callers:list[tuple[str|None,str|None]|str|None]|None=None,astra_db_client:AstraDB|None=None,async_astra_db_client:AsyncAstraDB|None=None,)->None:"""BaseStore implementation using DataStax AstraDB as the underlying store. The value type can be any type serializable by json.dumps. Can be used to store embeddings with the CacheBackedEmbeddings. Documents in the AstraDB collection will have the format .. code-block:: json { "_id": "<key>", "value": <value> } Args: collection_name: name of the Astra DB collection to create/use. token: API token for Astra DB usage, either in the form of a string or a subclass of `astrapy.authentication.TokenProvider`. If not provided, the environment variable ASTRA_DB_APPLICATION_TOKEN is inspected. api_endpoint: full URL to the API endpoint, such as `https://<DB-ID>-us-east1.apps.astra.datastax.com`. If not provided, the environment variable ASTRA_DB_API_ENDPOINT is inspected. namespace: namespace (aka keyspace) where the collection is created. If not provided, the environment variable ASTRA_DB_KEYSPACE is inspected. Defaults to the database's "default namespace". environment: a string specifying the environment of the target Data API. If omitted, defaults to "prod" (Astra DB production). Other values are in `astrapy.constants.Environment` enum class. setup_mode: mode used to create the Astra DB collection (SYNC, ASYNC or OFF). pre_delete_collection: whether to delete the collection before creating it. If False and the collection already exists, the collection will be used as is. ext_callers: one or more caller identities to identify Data API calls in the User-Agent header. This is a list of (name, version) pairs, or just strings if no version info is provided, which, if supplied, becomes the leading part of the User-Agent string in all API requests related to this component. astra_db_client: *DEPRECATED starting from version 0.3.5.* *Please use 'token', 'api_endpoint' and optionally 'environment'.* you can pass an already-created 'astrapy.db.AstraDB' instance (alternatively to 'token', 'api_endpoint' and 'environment'). async_astra_db_client: *DEPRECATED starting from version 0.3.5.* *Please use 'token', 'api_endpoint' and optionally 'environment'.* you can pass an already-created 'astrapy.db.AsyncAstraDB' instance (alternatively to 'token', 'api_endpoint' and 'environment'). """super().__init__(collection_name=collection_name,token=token,api_endpoint=api_endpoint,namespace=namespace,environment=environment,setup_mode=setup_mode,pre_delete_collection=pre_delete_collection,ext_callers=ext_callers,component_name=COMPONENT_NAME_STORE,astra_db_client=astra_db_client,async_astra_db_client=async_astra_db_client,)
[docs]def__init__(self,*,collection_name:str,token:str|TokenProvider|None=None,api_endpoint:str|None=None,namespace:str|None=None,environment:str|None=None,pre_delete_collection:bool=False,setup_mode:SetupMode=SetupMode.SYNC,ext_callers:list[tuple[str|None,str|None]|str|None]|None=None,astra_db_client:AstraDB|None=None,async_astra_db_client:AsyncAstraDB|None=None,)->None:"""ByteStore implementation using DataStax AstraDB as the underlying store. The bytes values are converted to base64 encoded strings Documents in the AstraDB collection will have the format .. code-block:: json { "_id": "<key>", "value": "<byte64 string value>" } Args: collection_name: name of the Astra DB collection to create/use. token: API token for Astra DB usage, either in the form of a string or a subclass of `astrapy.authentication.TokenProvider`. If not provided, the environment variable ASTRA_DB_APPLICATION_TOKEN is inspected. api_endpoint: full URL to the API endpoint, such as `https://<DB-ID>-us-east1.apps.astra.datastax.com`. If not provided, the environment variable ASTRA_DB_API_ENDPOINT is inspected. namespace: namespace (aka keyspace) where the collection is created. If not provided, the environment variable ASTRA_DB_KEYSPACE is inspected. Defaults to the database's "default namespace". environment: a string specifying the environment of the target Data API. If omitted, defaults to "prod" (Astra DB production). Other values are in `astrapy.constants.Environment` enum class. setup_mode: mode used to create the Astra DB collection (SYNC, ASYNC or OFF). pre_delete_collection: whether to delete the collection before creating it. If False and the collection already exists, the collection will be used as is. ext_callers: one or more caller identities to identify Data API calls in the User-Agent header. This is a list of (name, version) pairs, or just strings if no version info is provided, which, if supplied, becomes the leading part of the User-Agent string in all API requests related to this component. astra_db_client: *DEPRECATED starting from version 0.3.5.* *Please use 'token', 'api_endpoint' and optionally 'environment'.* you can pass an already-created 'astrapy.db.AstraDB' instance (alternatively to 'token', 'api_endpoint' and 'environment'). async_astra_db_client: *DEPRECATED starting from version 0.3.5.* *Please use 'token', 'api_endpoint' and optionally 'environment'.* you can pass an already-created 'astrapy.db.AsyncAstraDB' instance (alternatively to 'token', 'api_endpoint' and 'environment'). """super().__init__(collection_name=collection_name,token=token,api_endpoint=api_endpoint,namespace=namespace,environment=environment,setup_mode=setup_mode,pre_delete_collection=pre_delete_collection,ext_callers=ext_callers,component_name=COMPONENT_NAME_BYTESTORE,astra_db_client=astra_db_client,async_astra_db_client=async_astra_db_client,)