Source code for langchain_community.chat_message_histories.cassandra
"""Cassandra-based chat message history, based on cassIO."""from__future__importannotationsimportjsonimportuuidfromtypingimportTYPE_CHECKING,Any,Dict,Iterable,List,Optional,Sequencefromlangchain_community.utilities.cassandraimportSetupModeifTYPE_CHECKING:fromcassandra.clusterimportSessionfromcassio.table.table_typesimportRowTypefromlangchain_core.chat_historyimportBaseChatMessageHistoryfromlangchain_core.messagesimport(BaseMessage,message_to_dict,messages_from_dict,)DEFAULT_TABLE_NAME="message_store"DEFAULT_TTL_SECONDS=Nonedef_rows_to_messages(rows:Iterable[RowType])->List[BaseMessage]:message_blobs=[row["body_blob"]forrowinrows][::-1]items=[json.loads(message_blob)formessage_blobinmessage_blobs]messages=messages_from_dict(items)returnmessages
[docs]classCassandraChatMessageHistory(BaseChatMessageHistory):"""Chat message history that is backed by Cassandra."""
[docs]def__init__(self,session_id:str,session:Optional[Session]=None,keyspace:Optional[str]=None,table_name:str=DEFAULT_TABLE_NAME,ttl_seconds:Optional[int]=DEFAULT_TTL_SECONDS,*,setup_mode:SetupMode=SetupMode.SYNC,)->None:""" Initialize a new instance of CassandraChatMessageHistory. Args: session_id: arbitrary key that is used to store the messages of a single chat session. session: Cassandra driver session. If not provided, it is resolved from cassio. keyspace: Cassandra key space. If not provided, it is resolved from cassio. table_name: name of the table to use. ttl_seconds: time-to-live (seconds) for automatic expiration of stored entries. None (default) for no expiration. setup_mode: mode used to create the Cassandra table (SYNC, ASYNC or OFF). """try:fromcassio.tableimportClusteredCassandraTableexcept(ImportError,ModuleNotFoundError):raiseImportError("Could not import cassio python package. ""Please install it with `pip install cassio`.")self.session_id=session_idself.ttl_seconds=ttl_secondskwargs:Dict[str,Any]={}ifsetup_mode==SetupMode.ASYNC:kwargs["async_setup"]=Trueself.table=ClusteredCassandraTable(session=session,keyspace=keyspace,table=table_name,ttl_seconds=ttl_seconds,primary_key_type=["TEXT","TIMEUUID"],ordering_in_partition="DESC",skip_provisioning=setup_mode==SetupMode.OFF,**kwargs,)
@propertydefmessages(self)->List[BaseMessage]:# type: ignore"""Retrieve all session messages from DB"""# The latest are returned, in chronological orderrows=self.table.get_partition(partition_id=self.session_id,)return_rows_to_messages(rows)
[docs]asyncdefaget_messages(self)->List[BaseMessage]:"""Retrieve all session messages from DB"""# The latest are returned, in chronological orderrows=awaitself.table.aget_partition(partition_id=self.session_id,)return_rows_to_messages(rows)
[docs]defadd_message(self,message:BaseMessage)->None:"""Write a message to the table Args: message: A message to write. """this_row_id=uuid.uuid4()self.table.put(partition_id=self.session_id,row_id=this_row_id,body_blob=json.dumps(message_to_dict(message)),ttl_seconds=self.ttl_seconds,)