[docs]@dataclassclassColumn:name:strdata_type:strnullable:bool=Truedef__post_init__(self)->None:"""Check if initialization parameters are valid. Raises: ValueError: If Column name is not string. ValueError: If data_type is not type string. """ifnotisinstance(self.name,str):raiseValueError("Column name must be type string")ifnotisinstance(self.data_type,str):raiseValueError("Column data_type must be type string")
[docs]classPGEngine:"""A class for managing connections to a Postgres database."""_default_loop:Optional[asyncio.AbstractEventLoop]=None_default_thread:Optional[Thread]=None__create_key=object()
[docs]def__init__(self,key:object,pool:AsyncEngine,loop:Optional[asyncio.AbstractEventLoop],thread:Optional[Thread],)->None:"""PGEngine constructor. Args: key (object): Prevent direct constructor usage. pool (AsyncEngine): Async engine connection pool. loop (Optional[asyncio.AbstractEventLoop]): Async event loop used to create the engine. thread (Optional[Thread]): Thread used to create the engine async. Raises: Exception: If the constructor is called directly by the user. """ifkey!=PGEngine.__create_key:raiseException("Only create class through 'from_connection_string' or 'from_engine' methods!")self._pool=poolself._loop=loopself._thread=thread
[docs]@classmethoddeffrom_engine(cls:type[PGEngine],engine:AsyncEngine,loop:Optional[asyncio.AbstractEventLoop]=None,)->PGEngine:"""Create an PGEngine instance from an AsyncEngine."""returncls(cls.__create_key,engine,loop,None)
[docs]@classmethoddeffrom_connection_string(cls,url:str|URL,**kwargs:Any,)->PGEngine:"""Create an PGEngine instance from arguments Args: url (Optional[str]): the URL used to connect to a database. Use url or set other arguments. Raises: ValueError: If not all database url arguments are specified Returns: PGEngine """# Running a loop in a background thread allows us to support# async methods from non-async environmentsifcls._default_loopisNone:cls._default_loop=asyncio.new_event_loop()cls._default_thread=Thread(target=cls._default_loop.run_forever,daemon=True)cls._default_thread.start()engine=create_async_engine(url,**kwargs)returncls(cls.__create_key,engine,cls._default_loop,cls._default_thread)
asyncdef_run_as_async(self,coro:Awaitable[T])->T:"""Run an async coroutine asynchronously"""# If a loop has not been provided, attempt to run in current threadifnotself._loop:returnawaitcoro# Otherwise, run in the background threadreturnawaitasyncio.wrap_future(asyncio.run_coroutine_threadsafe(coro,self._loop))def_run_as_sync(self,coro:Awaitable[T])->T:"""Run an async coroutine synchronously"""ifnotself._loop:raiseException("Engine was initialized without a background loop and cannot call sync methods.")returnasyncio.run_coroutine_threadsafe(coro,self._loop).result()
[docs]asyncdefclose(self)->None:"""Dispose of connection pool"""awaitself._run_as_async(self._pool.dispose())
def_escape_postgres_identifier(self,name:str)->str:returnname.replace('"','""')def_validate_column_dict(self,col:ColumnDict)->None:ifnotisinstance(col.get("name"),str):raiseTypeError("The 'name' field must be a string.")ifnotisinstance(col.get("data_type"),str):raiseTypeError("The 'data_type' field must be a string.")ifnotisinstance(col.get("nullable"),bool):raiseTypeError("The 'nullable' field must be a boolean.")asyncdef_ainit_vectorstore_table(self,table_name:str,vector_size:int,*,schema_name:str="public",content_column:str="content",embedding_column:str="embedding",metadata_columns:Optional[list[Union[Column,ColumnDict]]]=None,metadata_json_column:str="langchain_metadata",id_column:Union[str,Column,ColumnDict]="langchain_id",overwrite_existing:bool=False,store_metadata:bool=True,)->None:""" Create a table for saving of vectors to be used with PGVectorStore. Args: table_name (str): The database table name. vector_size (int): Vector size for the embedding model to be used. schema_name (str): The schema name. Default: "public". content_column (str): Name of the column to store document content. Default: "page_content". embedding_column (str) : Name of the column to store vector embeddings. Default: "embedding". metadata_columns (Optional[list[Union[Column, ColumnDict]]]): A list of Columns to create for custom metadata. Default: None. Optional. metadata_json_column (str): The column to store extra metadata in JSON format. Default: "langchain_metadata". Optional. id_column (Union[str, Column, ColumnDict]) : Column to store ids. Default: "langchain_id" column name with data type UUID. Optional. overwrite_existing (bool): Whether to drop existing table. Default: False. store_metadata (bool): Whether to store metadata in the table. Default: True. Raises: :class:`DuplicateTableError <asyncpg.exceptions.DuplicateTableError>`: if table already exists. :class:`UndefinedObjectError <asyncpg.exceptions.UndefinedObjectError>`: if the data type of the id column is not a postgreSQL data type. """schema_name=self._escape_postgres_identifier(schema_name)table_name=self._escape_postgres_identifier(table_name)content_column=self._escape_postgres_identifier(content_column)embedding_column=self._escape_postgres_identifier(embedding_column)ifmetadata_columnsisNone:metadata_columns=[]else:forcolinmetadata_columns:ifisinstance(col,Column):col.name=self._escape_postgres_identifier(col.name)elifisinstance(col,dict):self._validate_column_dict(col)col["name"]=self._escape_postgres_identifier(col["name"])ifisinstance(id_column,str):id_column=self._escape_postgres_identifier(id_column)elifisinstance(id_column,Column):id_column.name=self._escape_postgres_identifier(id_column.name)else:self._validate_column_dict(id_column)id_column["name"]=self._escape_postgres_identifier(id_column["name"])asyncwithself._pool.connect()asconn:awaitconn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))awaitconn.commit()ifoverwrite_existing:asyncwithself._pool.connect()asconn:awaitconn.execute(text(f'DROP TABLE IF EXISTS "{schema_name}"."{table_name}"'))awaitconn.commit()ifisinstance(id_column,str):id_data_type="UUID"id_column_name=id_columnelifisinstance(id_column,Column):id_data_type=id_column.data_typeid_column_name=id_column.nameelse:id_data_type=id_column["data_type"]id_column_name=id_column["name"]query=f"""CREATE TABLE "{schema_name}"."{table_name}"( "{id_column_name}" {id_data_type} PRIMARY KEY, "{content_column}" TEXT NOT NULL, "{embedding_column}" vector({vector_size}) NOT NULL"""forcolumninmetadata_columns:ifisinstance(column,Column):nullable="NOT NULL"ifnotcolumn.nullableelse""query+=f',\n"{column.name}" {column.data_type}{nullable}'elifisinstance(column,dict):nullable="NOT NULL"ifnotcolumn["nullable"]else""query+=f',\n"{column["name"]}" {column["data_type"]}{nullable}'ifstore_metadata:query+=f""",\n"{metadata_json_column}" JSON"""query+="\n);"asyncwithself._pool.connect()asconn:awaitconn.execute(text(query))awaitconn.commit()
[docs]asyncdefainit_vectorstore_table(self,table_name:str,vector_size:int,*,schema_name:str="public",content_column:str="content",embedding_column:str="embedding",metadata_columns:Optional[list[Union[Column,ColumnDict]]]=None,metadata_json_column:str="langchain_metadata",id_column:Union[str,Column,ColumnDict]="langchain_id",overwrite_existing:bool=False,store_metadata:bool=True,)->None:""" Create a table for saving of vectors to be used with PGVectorStore. Args: table_name (str): The database table name. vector_size (int): Vector size for the embedding model to be used. schema_name (str): The schema name. Default: "public". content_column (str): Name of the column to store document content. Default: "page_content". embedding_column (str) : Name of the column to store vector embeddings. Default: "embedding". metadata_columns (Optional[list[Union[Column, ColumnDict]]]): A list of Columns to create for custom metadata. Default: None. Optional. metadata_json_column (str): The column to store extra metadata in JSON format. Default: "langchain_metadata". Optional. id_column (Union[str, Column, ColumnDict]) : Column to store ids. Default: "langchain_id" column name with data type UUID. Optional. overwrite_existing (bool): Whether to drop existing table. Default: False. store_metadata (bool): Whether to store metadata in the table. Default: True. """awaitself._run_as_async(self._ainit_vectorstore_table(table_name,vector_size,schema_name=schema_name,content_column=content_column,embedding_column=embedding_column,metadata_columns=metadata_columns,metadata_json_column=metadata_json_column,id_column=id_column,overwrite_existing=overwrite_existing,store_metadata=store_metadata,))
[docs]definit_vectorstore_table(self,table_name:str,vector_size:int,*,schema_name:str="public",content_column:str="content",embedding_column:str="embedding",metadata_columns:Optional[list[Union[Column,ColumnDict]]]=None,metadata_json_column:str="langchain_metadata",id_column:Union[str,Column,ColumnDict]="langchain_id",overwrite_existing:bool=False,store_metadata:bool=True,)->None:""" Create a table for saving of vectors to be used with PGVectorStore. Args: table_name (str): The database table name. vector_size (int): Vector size for the embedding model to be used. schema_name (str): The schema name. Default: "public". content_column (str): Name of the column to store document content. Default: "page_content". embedding_column (str) : Name of the column to store vector embeddings. Default: "embedding". metadata_columns (Optional[list[Union[Column, ColumnDict]]]): A list of Columns to create for custom metadata. Default: None. Optional. metadata_json_column (str): The column to store extra metadata in JSON format. Default: "langchain_metadata". Optional. id_column (Union[str, Column, ColumnDict]) : Column to store ids. Default: "langchain_id" column name with data type UUID. Optional. overwrite_existing (bool): Whether to drop existing table. Default: False. store_metadata (bool): Whether to store metadata in the table. Default: True. """self._run_as_sync(self._ainit_vectorstore_table(table_name,vector_size,schema_name=schema_name,content_column=content_column,embedding_column=embedding_column,metadata_columns=metadata_columns,metadata_json_column=metadata_json_column,id_column=id_column,overwrite_existing=overwrite_existing,store_metadata=store_metadata,))
asyncdef_adrop_table(self,table_name:str,*,schema_name:str="public",)->None:"""Drop the vector store table"""query=f'DROP TABLE "{schema_name}"."{table_name}";'asyncwithself._pool.connect()asconn:awaitconn.execute(text(query))awaitconn.commit()