importcontextlibfrompathlibimportPathfromtypingimport(Any,AsyncGenerator,AsyncIterator,Dict,Generator,Iterator,List,Optional,Sequence,Tuple,Union,cast,)fromlangchain_core.storesimportBaseStorefromsqlalchemyimport(LargeBinary,Text,and_,create_engine,delete,select,)fromsqlalchemy.engine.baseimportEnginefromsqlalchemy.ext.asyncioimport(AsyncEngine,AsyncSession,create_async_engine,)fromsqlalchemy.ormimport(Mapped,Session,declarative_base,sessionmaker,)try:fromsqlalchemy.ext.asyncioimportasync_sessionmakerexceptImportError:# dummy for sqlalchemy < 2async_sessionmaker=type("async_sessionmaker",(type,),{})# type: ignoreBase=declarative_base()try:fromsqlalchemy.ormimportmapped_columnclassLangchainKeyValueStores(Base):# type: ignore[valid-type,misc]"""Table used to save values."""# ATTENTION:# Prior to modifying this table, please determine whether# we should create migrations for this table to make sure# users do not experience data loss.__tablename__="langchain_key_value_stores"namespace:Mapped[str]=mapped_column(primary_key=True,index=True,nullable=False)key:Mapped[str]=mapped_column(primary_key=True,index=True,nullable=False)value=mapped_column(LargeBinary,index=False,nullable=False)exceptImportError:# dummy for sqlalchemy < 2fromsqlalchemyimportColumn
[docs]classLangchainKeyValueStores(Base):# type: ignore[valid-type,misc,no-redef]"""Table used to save values."""# ATTENTION:# Prior to modifying this table, please determine whether# we should create migrations for this table to make sure# users do not experience data loss.__tablename__="langchain_key_value_stores"namespace=Column(Text(),primary_key=True,index=True,nullable=False)key=Column(Text(),primary_key=True,index=True,nullable=False)value=Column(LargeBinary,index=False,nullable=False)
# This is a fix of original SQLStore.# This can will be removed when a PR will be merged.
[docs]classSQLStore(BaseStore[str,bytes]):"""BaseStore interface that works on an SQL database. Examples: Create a SQLStore instance and perform operations on it: .. code-block:: python from langchain_community.storage import SQLStore # Instantiate the SQLStore with the root path sql_store = SQLStore(namespace="test", db_url="sqlite://:memory:") # Set values for keys sql_store.mset([("key1", b"value1"), ("key2", b"value2")]) # Get values for keys values = sql_store.mget(["key1", "key2"]) # Returns [b"value1", b"value2"] # Delete keys sql_store.mdelete(["key1"]) # Iterate over keys for key in sql_store.yield_keys(): print(key) """
[docs]def__init__(self,*,namespace:str,db_url:Optional[Union[str,Path]]=None,engine:Optional[Union[Engine,AsyncEngine]]=None,engine_kwargs:Optional[Dict[str,Any]]=None,async_mode:Optional[bool]=None,):ifdb_urlisNoneandengineisNone:raiseValueError("Must specify either db_url or engine")ifdb_urlisnotNoneandengineisnotNone:raiseValueError("Must specify either db_url or engine, not both")_engine:Union[Engine,AsyncEngine]ifdb_url:ifasync_modeisNone:async_mode=Falseifasync_mode:_engine=create_async_engine(url=str(db_url),**(engine_kwargsor{}),)else:_engine=create_engine(url=str(db_url),**(engine_kwargsor{}))elifengine:_engine=engineelse:raiseAssertionError("Something went wrong with configuration of engine.")_session_maker:Union[sessionmaker[Session],async_sessionmaker[AsyncSession]]ifisinstance(_engine,AsyncEngine):self.async_mode=True_session_maker=async_sessionmaker(bind=_engine)else:self.async_mode=False_session_maker=sessionmaker(bind=_engine)self.engine=_engineself.dialect=_engine.dialect.nameself.session_maker=_session_makerself.namespace=namespace
[docs]defcreate_schema(self)->None:Base.metadata.create_all(self.engine)# problem in sqlalchemy v1
# sqlalchemy.exc.CompileError: (in table 'langchain_key_value_stores',# column 'namespace'): Can't generate DDL for NullType(); did you forget# to specify a type on this Column?
@contextlib.contextmanagerdef_make_sync_session(self)->Generator[Session,None,None]:"""Make an async session."""ifself.async_mode:raiseValueError("Attempting to use a sync method in when async mode is turned on. ""Please use the corresponding async method instead.")withcast(Session,self.session_maker())assession:yieldcast(Session,session)@contextlib.asynccontextmanagerasyncdef_make_async_session(self)->AsyncGenerator[AsyncSession,None]:"""Make an async session."""ifnotself.async_mode:raiseValueError("Attempting to use an async method in when sync mode is turned on. ""Please use the corresponding async method instead.")asyncwithcast(AsyncSession,self.session_maker())assession:yieldcast(AsyncSession,session)