"""Utilities for AstraDB setup and management."""from__future__importannotationsimportasyncioimportinspectimportjsonimportloggingimportosimportwarningsfromasyncioimportInvalidStateError,TaskfromenumimportEnumfromimportlib.metadataimportversionfromtypingimportTYPE_CHECKING,Any,Awaitableimportlangchain_corefromastrapyimportAsyncDatabase,DataAPIClient,Databasefromastrapy.adminimportparse_api_endpointfromastrapy.api_optionsimportAPIOptions,SerdesOptions,TimeoutOptionsfromastrapy.authenticationimport(EmbeddingAPIKeyHeaderProvider,EmbeddingHeadersProvider,RerankingAPIKeyHeaderProvider,RerankingHeadersProvider,StaticTokenProvider,TokenProvider,)fromastrapy.constantsimportEnvironmentfromastrapy.exceptionsimportDataAPIException,DataAPIResponseExceptionfromastrapy.infoimport(CollectionDefinition,CollectionLexicalOptions,CollectionRerankOptions,RerankServiceOptions,)ifTYPE_CHECKING:fromastrapy.infoimportCollectionDescriptor,VectorServiceOptionsTOKEN_ENV_VAR="ASTRA_DB_APPLICATION_TOKEN"# noqa: S105API_ENDPOINT_ENV_VAR="ASTRA_DB_API_ENDPOINT"KEYSPACE_ENV_VAR="ASTRA_DB_KEYSPACE"# Caller-related constantsLC_CORE_CALLER_NAME="langchain"LC_CORE_CALLER_VERSION=getattr(langchain_core,"__version__",None)LC_CORE_CALLER=(LC_CORE_CALLER_NAME,LC_CORE_CALLER_VERSION)LC_ASTRADB_VERSION:str|Nonetry:LC_ASTRADB_VERSION=version("langchain_astradb")exceptTypeError:LC_ASTRADB_VERSION=None# component names for the 'callers' parameterCOMPONENT_NAME_CACHE="langchain_cache"COMPONENT_NAME_SEMANTICCACHE="langchain_semanticcache"COMPONENT_NAME_CHATMESSAGEHISTORY="langchain_chatmessagehistory"COMPONENT_NAME_LOADER="langchain_loader"COMPONENT_NAME_GRAPHVECTORSTORE="langchain_graphvectorstore"COMPONENT_NAME_STORE="langchain_store"COMPONENT_NAME_BYTESTORE="langchain_bytestore"COMPONENT_NAME_VECTORSTORE="langchain_vectorstore"# Default settings for API data operations (concurrency & similar):# Chunk size for many-document insertions (None meaning defer to astrapy):DEFAULT_DOCUMENT_CHUNK_SIZE=None# thread/coroutine count for bulk insertsMAX_CONCURRENT_DOCUMENT_INSERTIONS=20# Thread/coroutine count for one-doc-at-a-time overwritesMAX_CONCURRENT_DOCUMENT_REPLACEMENTS=20# Thread/coroutine count for one-doc-at-a-time deletes:MAX_CONCURRENT_DOCUMENT_DELETIONS=20# Hardcoded here for the time beingASTRA_DB_REQUEST_TIMEOUT_MS=30000# Amount of (max) number of documents for surveying a collectionSURVEY_NUMBER_OF_DOCUMENTS=15# Data API error code for 'collection exists and it's different'EXISTING_COLLECTION_ERROR_CODE="EXISTING_COLLECTION_DIFFERENT_SETTINGS"COLLECTION_DEFAULTS_MISMATCH_ERROR_MESSAGE=("Astra DB collection '{collection_name}' was ""found to be configured differently than requested ""by the vector store creation. This is resulting ""in a hard exception from the Data API (accessible as ""`<this-exception>.__cause__`). Please see ""https://github.com/langchain-ai/langchain-datastax""/blob/main/libs/astradb/README.md#collection-""defaults-mismatch for more context about this ""issue and possible mitigations.")logger=logging.getLogger()
[docs]classSetupMode(Enum):"""Setup mode for the Astra DB collection."""SYNC=1ASYNC=2OFF=3
[docs]classHybridSearchMode(Enum):"""Hybrid Search mode for a Vector Store collection."""DEFAULT=1ON=2OFF=3
def_unpack_indexing_policy(indexing_dict:dict[str,list[str]]|None,)->tuple[str|None,list[str]|None]:"""{} or None => (None, None); {"a": "b"} => ("a", "b"); multikey => error."""ifindexing_dict:iflen(indexing_dict)!=1:msg="Unexpected indexing policy provided: "f"{indexing_dict}"raiseValueError(msg)returnnext(iter(indexing_dict.items()))returnNone,Nonedef_api_exception_error_codes(exc:DataAPIException)->set[str|None]:ifisinstance(exc,DataAPIResponseException):return{ed.error_codeforedinexc.error_descriptors}returnset()def_survey_collection(collection_name:str,*,token:str|TokenProvider|None=None,api_endpoint:str|None=None,keyspace:str|None=None,environment:str|None=None,ext_callers:list[tuple[str|None,str|None]|str|None]|None=None,component_name:str|None=None,)->tuple[CollectionDescriptor|None,list[dict[str,Any]]]:"""Return the collection descriptor (if found) and a sample of documents."""_astra_db_env=_AstraDBEnvironment(token=token,api_endpoint=api_endpoint,keyspace=keyspace,environment=environment,ext_callers=ext_callers,component_name=component_name,)descriptors=[coll_dforcoll_din_astra_db_env.database.list_collections()ifcoll_d.name==collection_name]ifnotdescriptors:returnNone,[]descriptor=descriptors[0]# fetch some documentsdocument_ite=_astra_db_env.database.get_collection(collection_name).find(filter={},projection={"*":True},limit=SURVEY_NUMBER_OF_DOCUMENTS,)return(descriptor,list(document_ite))def_normalize_data_api_environment(arg_environment:str|None,api_endpoint:str,)->str:_environment:strifarg_environmentisnotNone:returnarg_environmentparsed_endpoint=parse_api_endpoint(api_endpoint)ifparsed_endpointisNone:logger.info("Detecting API environment '%s' from supplied endpoint",Environment.OTHER,)returnEnvironment.OTHERlogger.info("Detecting API environment '%s' from supplied endpoint",parsed_endpoint.environment,)returnparsed_endpoint.environment
[docs]classAstraDBError(Exception):"""An exception during Astra DB- (Data API-) related operations. This exception represents any operational exception occurring while working with the generic set-up and/or provisioning of components backed by Astra DB (in particular, collection creation and inspection). """
class_AstraDBEnvironment:def__init__(self,*,token:str|TokenProvider|None=None,api_endpoint:str|None=None,keyspace:str|None=None,environment:str|None=None,ext_callers:list[tuple[str|None,str|None]|str|None]|None=None,component_name:str|None=None,)->None:self.token:TokenProviderself.api_endpoint:str|Noneself.keyspace:str|Noneself.environment:str|Noneself.data_api_client:DataAPIClientself.database:Databaseself.async_database:AsyncDatabaseiftokenisNone:logger.info("Attempting to fetch token from environment ""variable '%s'",TOKEN_ENV_VAR,)self.token=StaticTokenProvider(os.getenv(TOKEN_ENV_VAR))elifisinstance(token,TokenProvider):self.token=tokenelse:self.token=StaticTokenProvider(token)ifapi_endpointisNone:logger.info("Attempting to fetch API endpoint from environment ""variable '%s'",API_ENDPOINT_ENV_VAR,)self.api_endpoint=os.getenv(API_ENDPOINT_ENV_VAR)else:self.api_endpoint=api_endpointifkeyspaceisNone:logger.info("Attempting to fetch keyspace from environment ""variable '%s'",KEYSPACE_ENV_VAR,)self.keyspace=os.getenv(KEYSPACE_ENV_VAR)else:self.keyspace=keyspace# init parameters are normalized to self.{token, api_endpoint, keyspace}.# Proceed. Keyspace and token can be None (resp. on Astra DB and non-Astra)ifself.api_endpointisNone:msg=("API endpoint for Data API not provided. ""Either pass it explicitly to the object constructor "f"or set the {API_ENDPOINT_ENV_VAR} environment variable.")raiseValueError(msg)self.environment=_normalize_data_api_environment(environment,self.api_endpoint,)# prepare the "callers" list to create the clients.# The callers, passed to astrapy, are made of these Caller pairs in this order:# - zero, one or more are the "ext_callers" passed to this environment# - a single ("langchain", <version of langchain_core>)# - if such is provided, a (component_name, <version of langchain_astradb>)# (note: if component_name is None, astrapy strips it out automatically)self.ext_callers=ext_callersself.component_name=component_namenorm_ext_callers=[cpairforcpairin(_raw_callerifisinstance(_raw_caller,tuple)else(_raw_caller,None)for_raw_callerin(self.ext_callersor[]))ifcpair[0]isnotNoneorcpair[1]isnotNone]self.full_callers=[*norm_ext_callers,LC_CORE_CALLER,(self.component_name,LC_ASTRADB_VERSION),]# create the client (set to return plain lists for vectors)self.data_api_client=DataAPIClient(environment=self.environment,api_options=APIOptions(callers=self.full_callers,serdes_options=SerdesOptions(custom_datatypes_in_reading=False),timeout_options=TimeoutOptions(request_timeout_ms=ASTRA_DB_REQUEST_TIMEOUT_MS),),)self.database=self.data_api_client.get_database(api_endpoint=self.api_endpoint,token=self.token,keyspace=self.keyspace,)self.async_database=self.database.to_async()class_AstraDBCollectionEnvironment(_AstraDBEnvironment):def__init__(self,collection_name:str,*,token:str|TokenProvider|None=None,api_endpoint:str|None=None,keyspace:str|None=None,environment:str|None=None,ext_callers:list[tuple[str|None,str|None]|str|None]|None=None,component_name:str|None=None,setup_mode:SetupMode=SetupMode.SYNC,pre_delete_collection:bool=False,embedding_dimension:int|Awaitable[int]|None=None,metric:str|None=None,requested_indexing_policy:dict[str,Any]|None=None,default_indexing_policy:dict[str,Any]|None=None,collection_vector_service_options:VectorServiceOptions|None=None,collection_embedding_api_key:str|EmbeddingHeadersProvider|None=None,collection_rerank:str|CollectionRerankOptions|RerankServiceOptions|None=None,collection_reranking_api_key:str|RerankingHeadersProvider|None=None,collection_lexical:str|dict[str,Any]|CollectionLexicalOptions|None=None,)->None:super().__init__(token=token,api_endpoint=api_endpoint,keyspace=keyspace,environment=environment,ext_callers=ext_callers,component_name=component_name,)self.collection_name=collection_nameself.collection_embedding_api_key=(collection_embedding_api_keyifisinstance(collection_embedding_api_key,EmbeddingHeadersProvider)elseEmbeddingAPIKeyHeaderProvider(collection_embedding_api_key))self.collection_reranking_api_key=(collection_reranking_api_keyifisinstance(collection_reranking_api_key,RerankingHeadersProvider)elseRerankingAPIKeyHeaderProvider(collection_reranking_api_key))self.collection=self.database.get_collection(name=self.collection_name,embedding_api_key=self.collection_embedding_api_key,reranking_api_key=self.collection_reranking_api_key,)self.async_collection=self.collection.to_async()self.collection_rerank=collection_rerankself.collection_lexical=collection_lexicalself.embedding_dimension=embedding_dimensionself.metric=metricself.requested_indexing_policy=requested_indexing_policyself.default_indexing_policy=default_indexing_policyself.collection_vector_service_options=collection_vector_service_optionsself.async_setup_db_task:Task|None=Noneifsetup_mode==SetupMode.ASYNC:self.async_setup_db_task=asyncio.create_task(self._asetup_db(pre_delete_collection=pre_delete_collection,embedding_dimension=embedding_dimension,metric=metric,default_indexing_policy=default_indexing_policy,requested_indexing_policy=requested_indexing_policy,collection_vector_service_options=collection_vector_service_options,))elifsetup_mode==SetupMode.SYNC:ifpre_delete_collection:self.database.drop_collection(collection_name)ifinspect.isawaitable(embedding_dimension):msg=("Cannot use an awaitable embedding_dimension with async_setup ""set to False")raiseValueError(msg)try:_idx_mode,_idx_target=_unpack_indexing_policy(requested_indexing_policy,)collection_definition=(CollectionDefinition.builder().set_vector_dimension(embedding_dimension)# type: ignore[arg-type].set_vector_metric(metric).set_indexing(indexing_mode=_idx_mode,indexing_target=_idx_target,).set_vector_service(collection_vector_service_options).set_lexical(self.collection_lexical).set_rerank(self.collection_rerank).build())self.database.create_collection(name=collection_name,definition=collection_definition,)exceptDataAPIExceptionasdata_api_exception:# possibly the collection is preexisting and may have legacy,# or custom, indexing settings: verify if it's that error,# and if so check for index mismatches - to raise the right error.data_api_error_codes=_api_exception_error_codes(data_api_exception)ifEXISTING_COLLECTION_ERROR_CODEindata_api_error_codes:collection_descriptors=list(self.database.list_collections())try:ifnotself._validate_indexing_policy(collection_descriptors=collection_descriptors,collection_name=self.collection_name,requested_indexing_policy=requested_indexing_policy,default_indexing_policy=default_indexing_policy,):# mismatch is not due to indexingmsg=COLLECTION_DEFAULTS_MISMATCH_ERROR_MESSAGE.format(collection_name=self.collection_name,)raiseAstraDBError(msg)fromdata_api_exceptionexceptValueErrorasvalidation_error:raisevalidation_errorfromdata_api_exceptionelse:raisedefcopy(self,*,token:str|TokenProvider|None=None,ext_callers:list[tuple[str|None,str|None]|str|None]|None=None,component_name:str|None=None,collection_embedding_api_key:str|EmbeddingHeadersProvider|None=None,collection_reranking_api_key:str|RerankingHeadersProvider|None=None,)->_AstraDBCollectionEnvironment:"""Create a copy, possibly with changed attributes. This method creates a shallow copy of this environment. If a parameter is passed and differs from None, it will replace the corresponding value in the copy. The method allows changing only the parameters that ensure the copy is functional and does not trigger side-effects: for example, one cannot create a copy acting on a new collection. In those cases, one should create a new instance of ``_AstraDBCollectionEnvironment`` from scratch. Attributes: token: API token for Astra DB usage, either in the form of a string or a subclass of ``astrapy.authentication.TokenProvider``. In order to suppress token usage in the copy, explicitly pass ``astrapy.authentication.StaticTokenProvider(None)``. ext_callers: additional custom (caller_name, caller_version) pairs to attach to the User-Agent header when issuing Data API requests. component_name: a value for the LangChain component name to use when identifying the originator of the Data API requests. collection_embedding_api_key: the API Key to supply in each Data API request if necessary. This is necessary if using the Vectorize feature and a required secret is not stored with the database. In order to suppress the API Key in the copy, explicitly pass ``astrapy.authentication.EmbeddingAPIKeyHeaderProvider(None)``. collection_reranking_api_key: the API Key to supply in each Data API request if necessary. This is necessary if using the Rerank feature and a required secret is not stored with the database. In order to suppress the API Key in the copy, explicitly pass ``astrapy.authentication.RerankingAPIKeyHeaderProvider(None)``. """return_AstraDBCollectionEnvironment(collection_name=self.collection_name,token=self.tokeniftokenisNoneelsetoken,api_endpoint=self.api_endpoint,keyspace=self.keyspace,environment=self.environment,ext_callers=self.ext_callersifext_callersisNoneelseext_callers,component_name=self.component_nameifcomponent_nameisNoneelsecomponent_name,setup_mode=SetupMode.OFF,collection_embedding_api_key=self.collection_embedding_api_keyifcollection_embedding_api_keyisNoneelsecollection_embedding_api_key,collection_rerank=self.collection_rerank,collection_reranking_api_key=self.collection_reranking_api_keyifcollection_reranking_api_keyisNoneelsecollection_reranking_api_key,collection_lexical=self.collection_lexical,embedding_dimension=self.embedding_dimension,metric=self.metric,requested_indexing_policy=self.requested_indexing_policy,default_indexing_policy=self.default_indexing_policy,collection_vector_service_options=self.collection_vector_service_options,)asyncdef_asetup_db(self,*,pre_delete_collection:bool,embedding_dimension:int|Awaitable[int]|None,metric:str|None,requested_indexing_policy:dict[str,Any]|None,default_indexing_policy:dict[str,Any]|None,collection_vector_service_options:VectorServiceOptions|None,)->None:ifpre_delete_collection:awaitself.async_database.drop_collection(self.collection_name)ifinspect.isawaitable(embedding_dimension):dimension=awaitembedding_dimensionelse:dimension=embedding_dimensiontry:_idx_mode,_idx_target=_unpack_indexing_policy(requested_indexing_policy)collection_definition=(CollectionDefinition.builder().set_vector_dimension(dimension).set_vector_metric(metric).set_indexing(indexing_mode=_idx_mode,indexing_target=_idx_target,).set_vector_service(collection_vector_service_options).set_lexical(self.collection_lexical).set_rerank(self.collection_rerank).build())awaitself.async_database.create_collection(name=self.collection_name,definition=collection_definition,)exceptDataAPIExceptionasdata_api_exception:# possibly the collection is preexisting and may have legacy,# or custom, indexing settings: verify if it's that error,# and if so check for index mismatches - to raise the right error.data_api_error_codes=_api_exception_error_codes(data_api_exception)ifEXISTING_COLLECTION_ERROR_CODEindata_api_error_codes:collection_descriptors=list(awaitasyncio.to_thread(self.database.list_collections))try:ifnotself._validate_indexing_policy(collection_descriptors=collection_descriptors,collection_name=self.collection_name,requested_indexing_policy=requested_indexing_policy,default_indexing_policy=default_indexing_policy,):# mismatch is not due to indexingmsg=COLLECTION_DEFAULTS_MISMATCH_ERROR_MESSAGE.format(collection_name=self.collection_name,)raiseAstraDBError(msg)fromdata_api_exceptionexceptValueErrorasvalidation_error:raisevalidation_errorfromdata_api_exceptionelse:raise@staticmethoddef_validate_indexing_policy(collection_descriptors:list[CollectionDescriptor],collection_name:str,requested_indexing_policy:dict[str,Any]|None,default_indexing_policy:dict[str,Any]|None,)->bool:"""Validate indexing policy. This is a validation helper, to be called when the collection-creation call has failed. Args: collection_descriptors: collection descriptors for the database. collection_name: the name of the collection whose attempted creation failed requested_indexing_policy: the 'indexing' part of the collection options, e.g. `{"deny": ["field1", "field2"]}`. Leave to its default of None if no options required. default_indexing_policy: an optional 'default value' for the above, used to issue just a gentle warning in the special case that no policy is detected on a preexisting collection on DB and the default is requested. This is to enable a warning-only transition to new code using indexing without disrupting usage of a legacy collection, i.e. one created before adopting the usage of indexing policies altogether. You cannot pass this one without requested_indexing_policy. This function may raise an error (indexing mismatches), issue a warning (about legacy collections), or do nothing. In any case, when the function returns, it returns either - True: the exception was handled here as part of the indexing management - False: the exception is unrelated to indexing and the caller has to reraise it. """ifrequested_indexing_policyisNoneanddefault_indexing_policyisnotNone:msg=("Cannot specify a default indexing policy ""when no indexing policy is requested for this collection ""(requested_indexing_policy is None, ""default_indexing_policy is not None).")raiseValueError(msg)preexisting=[collectionforcollectionincollection_descriptorsifcollection.name==collection_name]ifnotpreexisting:# foreign-origin for the original exceptionreturnFalsepre_collection=preexisting[0]# if it has no "indexing", it is a legacy collectionpre_col_definition=pre_collection.definitionifnotpre_col_definition.indexing:# legacy collection on DBifrequested_indexing_policy!=default_indexing_policy:msg=(f"Astra DB collection '{collection_name}' is ""detected as having indexing turned on for all ""fields (either created manually or by older ""versions of this plugin). This is incompatible with ""the requested indexing policy for this object. ""Consider indexing anew on a fresh ""collection with the requested indexing ""policy, or alternatively leave the indexing ""settings for this object to their defaults ""to keep using this collection.")raiseValueError(msg)warnings.warn((f"Astra DB collection '{collection_name}' is ""detected as having indexing turned on for all ""fields (either created manually or by older ""versions of this plugin). This implies stricter ""limitations on the amount of text each string in a ""document can store. Consider indexing anew on a ""fresh collection to be able to store longer texts. ""See https://github.com/langchain-ai/langchain-""datastax/blob/main/libs/astradb/README.md#""warnings-about-indexing for more details."),UserWarning,stacklevel=2,)# the original exception, related to indexing, was handled herereturnTrueifpre_col_definition.indexing!=requested_indexing_policy:# collection on DB has indexing settings, but differentoptions_json=json.dumps(pre_col_definition.indexing)default_desc=(" (default setting)"ifpre_col_definition.indexing==default_indexing_policyelse"")msg=(f"Astra DB collection '{collection_name}' is ""detected as having the following indexing policy: "f"{options_json}{default_desc}. This is incompatible ""with the requested indexing policy for this object. ""Consider indexing anew on a fresh ""collection with the requested indexing ""policy, or alternatively align the requested ""indexing settings to the collection to keep using it.")raiseValueError(msg)# the discrepancies have to do with options other than indexingreturnFalsedefensure_db_setup(self)->None:ifself.async_setup_db_task:try:self.async_setup_db_task.result()exceptInvalidStateErrorase:msg=("Asynchronous setup of the DB not finished. ""NB: Astra DB components sync methods shouldn't be called from the ""event loop. Consider using their async equivalents.")raiseValueError(msg)fromeasyncdefaensure_db_setup(self)->None:ifself.async_setup_db_task:awaitself.async_setup_db_task