[docs]classSingleStoreDBChatMessageHistory(BaseChatMessageHistory):"""Chat message history stored in a SingleStoreDB database."""
[docs]def__init__(self,session_id:str,*,table_name:str="message_store",id_field:str="id",session_id_field:str="session_id",message_field:str="message",pool_size:int=5,max_overflow:int=10,timeout:float=30,**kwargs:Any,):"""Initialize with necessary components. Args: table_name (str, optional): Specifies the name of the table in use. Defaults to "message_store". id_field (str, optional): Specifies the name of the id field in the table. Defaults to "id". session_id_field (str, optional): Specifies the name of the session_id field in the table. Defaults to "session_id". message_field (str, optional): Specifies the name of the message field in the table. Defaults to "message". Following arguments pertain to the connection pool: pool_size (int, optional): Determines the number of active connections in the pool. Defaults to 5. max_overflow (int, optional): Determines the maximum number of connections allowed beyond the pool_size. Defaults to 10. timeout (float, optional): Specifies the maximum wait time in seconds for establishing a connection. Defaults to 30. Following arguments pertain to the database connection: host (str, optional): Specifies the hostname, IP address, or URL for the database connection. The default scheme is "mysql". user (str, optional): Database username. password (str, optional): Database password. port (int, optional): Database port. Defaults to 3306 for non-HTTP connections, 80 for HTTP connections, and 443 for HTTPS connections. database (str, optional): Database name. Additional optional arguments provide further customization over the database connection: pure_python (bool, optional): Toggles the connector mode. If True, operates in pure Python mode. local_infile (bool, optional): Allows local file uploads. charset (str, optional): Specifies the character set for string values. ssl_key (str, optional): Specifies the path of the file containing the SSL key. ssl_cert (str, optional): Specifies the path of the file containing the SSL certificate. ssl_ca (str, optional): Specifies the path of the file containing the SSL certificate authority. ssl_cipher (str, optional): Sets the SSL cipher list. ssl_disabled (bool, optional): Disables SSL usage. ssl_verify_cert (bool, optional): Verifies the server's certificate. Automatically enabled if ``ssl_ca`` is specified. ssl_verify_identity (bool, optional): Verifies the server's identity. conv (dict[int, Callable], optional): A dictionary of data conversion functions. credential_type (str, optional): Specifies the type of authentication to use: auth.PASSWORD, auth.JWT, or auth.BROWSER_SSO. autocommit (bool, optional): Enables autocommits. results_type (str, optional): Determines the structure of the query results: tuples, namedtuples, dicts. results_format (str, optional): Deprecated. This option has been renamed to results_type. Examples: Basic Usage: .. code-block:: python from langchain_community.chat_message_histories import ( SingleStoreDBChatMessageHistory ) message_history = SingleStoreDBChatMessageHistory( session_id="my-session", host="https://user:password@127.0.0.1:3306/database" ) Advanced Usage: .. code-block:: python from langchain_community.chat_message_histories import ( SingleStoreDBChatMessageHistory ) message_history = SingleStoreDBChatMessageHistory( session_id="my-session", host="127.0.0.1", port=3306, user="user", password="password", database="db", table_name="my_custom_table", pool_size=10, timeout=60, ) Using environment variables: .. code-block:: python from langchain_community.chat_message_histories import ( SingleStoreDBChatMessageHistory ) os.environ['SINGLESTOREDB_URL'] = 'me:p455w0rd@s2-host.com/my_db' message_history = SingleStoreDBChatMessageHistory("my-session") """self.table_name=self._sanitize_input(table_name)self.session_id=self._sanitize_input(session_id)self.id_field=self._sanitize_input(id_field)self.session_id_field=self._sanitize_input(session_id_field)self.message_field=self._sanitize_input(message_field)# Pass the rest of the kwargs to the connection.self.connection_kwargs=kwargs# Add connection attributes to the connection kwargs.if"conn_attrs"notinself.connection_kwargs:self.connection_kwargs["conn_attrs"]=dict()self.connection_kwargs["conn_attrs"]["_connector_name"]="langchain python sdk"self.connection_kwargs["conn_attrs"]["_connector_version"]="2.1.0"# Create a connection pool.try:fromsqlalchemy.poolimportQueuePoolexceptImportError:raiseImportError("Could not import sqlalchemy.pool python package. ""Please install it with `pip install singlestoredb`.")self.connection_pool=QueuePool(self._get_connection,max_overflow=max_overflow,pool_size=pool_size,timeout=timeout,)self.table_created=False
def_sanitize_input(self,input_str:str)->str:# Remove characters that are not alphanumeric or underscoresreturnre.sub(r"[^a-zA-Z0-9_]","",input_str)def_get_connection(self)->Any:try:importsinglestoredbass2exceptImportError:raiseImportError("Could not import singlestoredb python package. ""Please install it with `pip install singlestoredb`.")returns2.connect(**self.connection_kwargs)def_create_table_if_not_exists(self)->None:"""Create table if it doesn't exist."""ifself.table_created:returnconn=self.connection_pool.connect()try:cur=conn.cursor()try:cur.execute("""CREATE TABLE IF NOT EXISTS {} ({} BIGINT PRIMARY KEY AUTO_INCREMENT, {} TEXT NOT NULL, {} JSON NOT NULL);""".format(self.table_name,self.id_field,self.session_id_field,self.message_field,),)self.table_created=Truefinally:cur.close()finally:conn.close()@propertydefmessages(self)->List[BaseMessage]:# type: ignore"""Retrieve the messages from SingleStoreDB"""self._create_table_if_not_exists()conn=self.connection_pool.connect()items=[]try:cur=conn.cursor()try:cur.execute("""SELECT {} FROM {} WHERE {} = %s""".format(self.message_field,self.table_name,self.session_id_field,),(self.session_id),)forrowincur.fetchall():items.append(row[0])finally:cur.close()finally:conn.close()messages=messages_from_dict(items)returnmessages
[docs]defadd_message(self,message:BaseMessage)->None:"""Append the message to the record in SingleStoreDB"""self._create_table_if_not_exists()conn=self.connection_pool.connect()try:cur=conn.cursor()try:cur.execute("""INSERT INTO {} ({}, {}) VALUES (%s, %s)""".format(self.table_name,self.session_id_field,self.message_field,),(self.session_id,json.dumps(message_to_dict(message))),)finally:cur.close()finally:conn.close()
[docs]defclear(self)->None:"""Clear session memory from SingleStoreDB"""self._create_table_if_not_exists()conn=self.connection_pool.connect()try:cur=conn.cursor()try:cur.execute("""DELETE FROM {} WHERE {} = %s""".format(self.table_name,self.session_id_field,),(self.session_id),)finally:cur.close()finally:conn.close()