Source code for langchain_postgres.utils.pgvector_migrator
importasyncioimportjsonimportwarningsfromtypingimportAny,AsyncIterator,Iterator,Optional,Sequence,TypeVarfromsqlalchemyimportRowMapping,textfromsqlalchemy.excimportProgrammingError,SQLAlchemyErrorfrom..v2.engineimportPGEnginefrom..v2.vectorstoresimportPGVectorStoreCOLLECTIONS_TABLE="langchain_pg_collection"EMBEDDINGS_TABLE="langchain_pg_embedding"T=TypeVar("T")asyncdef__aget_collection_uuid(engine:PGEngine,collection_name:str,)->str:""" Get the collection uuid for a collection present in PGVector tables. Args: engine (PGEngine): The PG engine corresponding to the Database. collection_name (str): The name of the collection to get the uuid for. Returns: The uuid corresponding to the collection. """query=f"SELECT name, uuid FROM {COLLECTIONS_TABLE} WHERE name = :collection_name"asyncwithengine._pool.connect()asconn:result=awaitconn.execute(text(query),parameters={"collection_name":collection_name})result_map=result.mappings()result_fetch=result_map.fetchone()ifresult_fetchisNone:raiseValueError(f"Collection, {collection_name} not found.")returnresult_fetch.uuidasyncdef__aextract_pgvector_collection(engine:PGEngine,collection_name:str,batch_size:int=1000,)->AsyncIterator[Sequence[RowMapping]]:""" Extract all data belonging to a PGVector collection. Args: engine (PGEngine): The PG engine corresponding to the Database. collection_name (str): The name of the collection to get the data for. batch_size (int): The batch size for collection extraction. Default: 1000. Optional. Yields: The data present in the collection. """try:uuid_task=asyncio.create_task(__aget_collection_uuid(engine,collection_name))query=f"SELECT * FROM {EMBEDDINGS_TABLE} WHERE collection_id = :id"asyncwithengine._pool.connect()asconn:uuid=awaituuid_taskresult_proxy=awaitconn.execute(text(query),parameters={"id":uuid})whileTrue:rows=result_proxy.fetchmany(size=batch_size)ifnotrows:breakyield[row._mappingforrowinrows]exceptValueError:raiseValueError(f"Collection, {collection_name} does not exist.")exceptSQLAlchemyErrorase:raiseProgrammingError(statement=f"Failed to extract data from collection '{collection_name}': {e}",params={"id":uuid},orig=e,)fromeasyncdef__concurrent_batch_insert(data_batches:AsyncIterator[Sequence[RowMapping]],vector_store:PGVectorStore,max_concurrency:int=100,)->None:pending:set[Any]=set()asyncforbatch_dataindata_batches:pending.add(asyncio.ensure_future(vector_store.aadd_embeddings(texts=[data.documentfordatainbatch_data],embeddings=[json.loads(data.embedding)fordatainbatch_data],metadatas=[data.cmetadatafordatainbatch_data],ids=[data.idfordatainbatch_data],)))iflen(pending)>=max_concurrency:_,pending=awaitasyncio.wait(pending,return_when=asyncio.FIRST_COMPLETED)ifpending:awaitasyncio.wait(pending)asyncdef__amigrate_pgvector_collection(engine:PGEngine,collection_name:str,vector_store:PGVectorStore,delete_pg_collection:Optional[bool]=False,insert_batch_size:int=1000,)->None:""" Migrate all data present in a PGVector collection to use separate tables for each collection. The new data format is compatible with the PGVectoreStore interface. Args: engine (PGEngine): The PG engine corresponding to the Database. collection_name (str): The collection to migrate. vector_store (PGVectorStore): The PGVectorStore object corresponding to the new collection table. delete_pg_collection (bool): An option to delete the original data upon migration. Default: False. Optional. insert_batch_size (int): Number of rows to insert at once in the table. Default: 1000. """destination_table=vector_store.get_table_name()# Get row count in PGVector collectionuuid_task=asyncio.create_task(__aget_collection_uuid(engine,collection_name))query=(f"SELECT COUNT(*) FROM {EMBEDDINGS_TABLE} WHERE collection_id=:collection_id")asyncwithengine._pool.connect()asconn:uuid=awaituuid_taskresult=awaitconn.execute(text(query),parameters={"collection_id":uuid})result_map=result.mappings()collection_data_len=result_map.fetchone()ifcollection_data_lenisNone:warnings.warn(f"Collection, {collection_name} contains no elements.")return# Extract data from the collection and batch insert into the new tabledata_batches=__aextract_pgvector_collection(engine,collection_name,batch_size=insert_batch_size)await__concurrent_batch_insert(data_batches,vector_store,max_concurrency=100)# Validate data migrationquery=f"SELECT COUNT(*) FROM {destination_table}"asyncwithengine._pool.connect()asconn:result=awaitconn.execute(text(query))result_map=result.mappings()table_size=result_map.fetchone()ifnottable_size:raiseValueError(f"Table: {destination_table} does not exist.")ifcollection_data_len["count"]!=table_size["count"]:raiseValueError("All data not yet migrated.\n"f"Original row count: {collection_data_len['count']}\n"f"Collection table, {destination_table} row count: {table_size['count']}")elifdelete_pg_collection:# Delete PGVector dataquery=f"DELETE FROM {EMBEDDINGS_TABLE} WHERE collection_id=:collection_id"asyncwithengine._pool.connect()asconn:awaitconn.execute(text(query),parameters={"collection_id":uuid})awaitconn.commit()query=f"DELETE FROM {COLLECTIONS_TABLE} WHERE name=:collection_name"asyncwithengine._pool.connect()asconn:awaitconn.execute(text(query),parameters={"collection_name":collection_name})awaitconn.commit()print(f"Successfully deleted PGVector collection, {collection_name}")asyncdef__alist_pgvector_collection_names(engine:PGEngine,)->list[str]:"""Lists all collection names present in PGVector table."""try:query=f"SELECT name from {COLLECTIONS_TABLE}"asyncwithengine._pool.connect()asconn:result=awaitconn.execute(text(query))result_map=result.mappings()all_rows=result_map.fetchall()return[row["name"]forrowinall_rows]exceptProgrammingErrorase:raiseValueError("Please provide the correct collection table name: "+str(e))frome
[docs]asyncdefaextract_pgvector_collection(engine:PGEngine,collection_name:str,batch_size:int=1000,)->AsyncIterator[Sequence[RowMapping]]:""" Extract all data belonging to a PGVector collection. Args: engine (PGEngine): The PG engine corresponding to the Database. collection_name (str): The name of the collection to get the data for. batch_size (int): The batch size for collection extraction. Default: 1000. Optional. Yields: The data present in the collection. """iterator=__aextract_pgvector_collection(engine,collection_name,batch_size)whileTrue:try:result=awaitengine._run_as_async(iterator.__anext__())yieldresultexceptStopAsyncIteration:break
[docs]asyncdefalist_pgvector_collection_names(engine:PGEngine,)->list[str]:"""Lists all collection names present in PGVector table."""returnawaitengine._run_as_async(__alist_pgvector_collection_names(engine))
[docs]asyncdefamigrate_pgvector_collection(engine:PGEngine,collection_name:str,vector_store:PGVectorStore,delete_pg_collection:Optional[bool]=False,insert_batch_size:int=1000,)->None:""" Migrate all data present in a PGVector collection to use separate tables for each collection. The new data format is compatible with the PGVectorStore interface. Args: engine (PGEngine): The PG engine corresponding to the Database. collection_name (str): The collection to migrate. vector_store (PGVectorStore): The PGVectorStore object corresponding to the new collection table. use_json_metadata (bool): An option to keep the PGVector metadata as json in the new table. Default: False. Optional. delete_pg_collection (bool): An option to delete the original data upon migration. Default: False. Optional. insert_batch_size (int): Number of rows to insert at once in the table. Default: 1000. """awaitengine._run_as_async(__amigrate_pgvector_collection(engine,collection_name,vector_store,delete_pg_collection,insert_batch_size,))
[docs]defextract_pgvector_collection(engine:PGEngine,collection_name:str,batch_size:int=1000,)->Iterator[Sequence[RowMapping]]:""" Extract all data belonging to a PGVector collection. Args: engine (PGEngine): The PG engine corresponding to the Database. collection_name (str): The name of the collection to get the data for. batch_size (int): The batch size for collection extraction. Default: 1000. Optional. Yields: The data present in the collection. """iterator=__aextract_pgvector_collection(engine,collection_name,batch_size)whileTrue:try:result=engine._run_as_sync(iterator.__anext__())yieldresultexceptStopAsyncIteration:break
[docs]deflist_pgvector_collection_names(engine:PGEngine)->list[str]:"""Lists all collection names present in PGVector table."""returnengine._run_as_sync(__alist_pgvector_collection_names(engine))
[docs]defmigrate_pgvector_collection(engine:PGEngine,collection_name:str,vector_store:PGVectorStore,delete_pg_collection:Optional[bool]=False,insert_batch_size:int=1000,)->None:""" Migrate all data present in a PGVector collection to use separate tables for each collection. The new data format is compatible with the PGVectorStore interface. Args: engine (PGEngine): The PG engine corresponding to the Database. collection_name (str): The collection to migrate. vector_store (PGVectorStore): The PGVectorStore object corresponding to the new collection table. delete_pg_collection (bool): An option to delete the original data upon migration. Default: False. Optional. insert_batch_size (int): Number of rows to insert at once in the table. Default: 1000. """engine._run_as_sync(__amigrate_pgvector_collection(engine,collection_name,vector_store,delete_pg_collection,insert_batch_size,))