Source code for langchain_community.storage.cassandra
from__future__importannotationsimportasynciofromasyncioimportInvalidStateError,Taskfromtypingimport(TYPE_CHECKING,AsyncIterator,Iterator,List,Optional,Sequence,Tuple,)fromlangchain_core.storesimportByteStorefromlangchain_community.utilities.cassandraimportSetupMode,aexecute_cqlifTYPE_CHECKING:fromcassandra.clusterimportSessionfromcassandra.queryimportPreparedStatementCREATE_TABLE_CQL_TEMPLATE=""" CREATE TABLE IF NOT EXISTS {keyspace}.{table} (row_id TEXT, body_blob BLOB, PRIMARY KEY (row_id));"""SELECT_TABLE_CQL_TEMPLATE=("""SELECT row_id, body_blob FROM {keyspace}.{table} WHERE row_id IN ?;""")SELECT_ALL_TABLE_CQL_TEMPLATE="""SELECT row_id, body_blob FROM {keyspace}.{table};"""INSERT_TABLE_CQL_TEMPLATE=("""INSERT INTO {keyspace}.{table} (row_id, body_blob) VALUES (?, ?);""")DELETE_TABLE_CQL_TEMPLATE="""DELETE FROM {keyspace}.{table} WHERE row_id IN ?;"""
[docs]classCassandraByteStore(ByteStore):"""A ByteStore implementation using Cassandra as the backend. Parameters: table: The name of the table to use. session: A Cassandra session object. If not provided, it will be resolved from the cassio config. keyspace: The keyspace to use. If not provided, it will be resolved from the cassio config. setup_mode: The setup mode to use. Default is SYNC (SetupMode.SYNC). """
[docs]def__init__(self,table:str,*,session:Optional[Session]=None,keyspace:Optional[str]=None,setup_mode:SetupMode=SetupMode.SYNC,)->None:ifnotsessionornotkeyspace:try:fromcassio.configimportcheck_resolve_keyspace,check_resolve_sessionself.keyspace=keyspaceorcheck_resolve_keyspace(keyspace)self.session=sessionorcheck_resolve_session()except(ImportError,ModuleNotFoundError):raiseImportError("Could not import a recent cassio package.""Please install it with `pip install --upgrade cassio`.")else:self.keyspace=keyspaceself.session=sessionself.table=tableself.select_statement=Noneself.insert_statement=Noneself.delete_statement=Nonecreate_cql=CREATE_TABLE_CQL_TEMPLATE.format(keyspace=self.keyspace,table=self.table,)self.db_setup_task:Optional[Task[None]]=Noneifsetup_mode==SetupMode.ASYNC:self.db_setup_task=asyncio.create_task(aexecute_cql(self.session,create_cql))else:self.session.execute(create_cql)
[docs]defensure_db_setup(self)->None:"""Ensure that the DB setup is finished. If not, raise a ValueError."""ifself.db_setup_task:try:self.db_setup_task.result()exceptInvalidStateError:raiseValueError("Asynchronous setup of the DB not finished. ""NB: AstraDB components sync methods shouldn't be called from the ""event loop. Consider using their async equivalents.")
[docs]asyncdefaensure_db_setup(self)->None:"""Ensure that the DB setup is finished. If not, wait for it."""ifself.db_setup_task:awaitself.db_setup_task
[docs]defget_select_statement(self)->PreparedStatement:"""Get the prepared select statement for the table. If not available, prepare it. Returns: PreparedStatement: The prepared statement. """ifnotself.select_statement:self.select_statement=self.session.prepare(SELECT_TABLE_CQL_TEMPLATE.format(keyspace=self.keyspace,table=self.table))returnself.select_statement
[docs]defget_insert_statement(self)->PreparedStatement:"""Get the prepared insert statement for the table. If not available, prepare it. Returns: PreparedStatement: The prepared statement. """ifnotself.insert_statement:self.insert_statement=self.session.prepare(INSERT_TABLE_CQL_TEMPLATE.format(keyspace=self.keyspace,table=self.table))returnself.insert_statement
[docs]defget_delete_statement(self)->PreparedStatement:"""Get the prepared delete statement for the table. If not available, prepare it. Returns: PreparedStatement: The prepared statement. """ifnotself.delete_statement:self.delete_statement=self.session.prepare(DELETE_TABLE_CQL_TEMPLATE.format(keyspace=self.keyspace,table=self.table))returnself.delete_statement