[docs]classYellowbrick(VectorStore):"""Yellowbrick as a vector database. Example: .. code-block:: python from langchain_community.vectorstores import Yellowbrick from langchain_community.embeddings.openai import OpenAIEmbeddings ... """classIndexType(str,enum.Enum):"""Enumerator for the supported Index types within Yellowbrick."""NONE="none"LSH="lsh"classIndexParams:"""Parameters for configuring a Yellowbrick index."""def__init__(self,index_type:Optional["Yellowbrick.IndexType"]=None,params:Optional[Dict[str,Any]]=None,):ifindex_typeisNone:index_type=Yellowbrick.IndexType.NONEself.index_type=index_typeself.params=paramsor{}defget_param(self,key:str,default:Any=None)->Any:returnself.params.get(key,default)
[docs]def__init__(self,embedding:Embeddings,connection_string:str,table:str,*,schema:Optional[str]=None,logger:Optional[logging.Logger]=None,drop:bool=False,)->None:"""Initialize with yellowbrick client. Args: embedding: Embedding operator connection_string: Format 'postgres://username:password@host:port/database' table: Table used to store / retrieve embeddings from """frompsycopg2importextrasextras.register_uuid()iflogger:self.logger=loggerelse:self.logger=logging.getLogger(__name__)self.logger.setLevel(logging.ERROR)handler=logging.StreamHandler()handler.setLevel(logging.DEBUG)formatter=logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")handler.setFormatter(formatter)self.logger.addHandler(handler)ifnotisinstance(embedding,Embeddings):self.logger.error("embeddings input must be Embeddings object.")returnself.LSH_INDEX_TABLE:str="_lsh_index"self.LSH_HYPERPLANE_TABLE:str="_lsh_hyperplane"self.CONTENT_TABLE:str="_content"self.connection_string=connection_stringself.connection=Yellowbrick.DatabaseConnection(connection_string,self.logger)atexit.register(self.connection.close_connection)self._schema=schemaself._table=tableself._embedding=embeddingself._max_embedding_len=Noneself._check_database_utf8()withself.connection.get_cursor()ascursor:ifdrop:self.drop(table=self._table,schema=self._schema,cursor=cursor)self.drop(table=self._table+self.CONTENT_TABLE,schema=self._schema,cursor=cursor,)self._drop_lsh_index_tables(cursor)self._create_schema(cursor)self._create_table(cursor)
classDatabaseConnection:_instance=None_connection_string:str_connection:Optional["PgConnection"]=None_logger:logging.Loggerdef__new__(cls,connection_string:str,logger:logging.Logger)->"Yellowbrick.DatabaseConnection":ifcls._instanceisNone:cls._instance=super().__new__(cls)cls._instance._connection_string=connection_stringcls._instance._logger=loggerreturncls._instancedefclose_connection(self)->None:ifself._connectionandnotself._connection.closed:self._connection.close()self._connection=Nonedefget_connection(self)->"PgConnection":importpsycopg2ifnotself._connectionorself._connection.closed:self._connection=psycopg2.connect(self._connection_string)self._connection.autocommit=Falsereturnself._connection@contextmanagerdefget_managed_connection(self)->Generator["PgConnection",None,None]:frompsycopg2importDatabaseErrorconn=self.get_connection()try:yieldconnexceptDatabaseErrorase:conn.rollback()self._logger.error("Database error occurred, rolling back transaction.",exc_info=True)raiseRuntimeError("Database transaction failed.")fromeelse:conn.commit()@contextmanagerdefget_cursor(self)->Generator["PgCursor",None,None]:withself.get_managed_connection()asconn:cursor=conn.cursor()try:yieldcursorfinally:cursor.close()def_create_schema(self,cursor:"PgCursor")->None:""" Helper function: create schema if not exists """frompsycopg2importsqlifself._schema:cursor.execute(sql.SQL(""" CREATE SCHEMA IF NOT EXISTS {s} """).format(s=sql.Identifier(self._schema),))def_create_table(self,cursor:"PgCursor")->None:""" Helper function: create table if not exists """frompsycopg2importsqlschema_prefix=(self._schema,)ifself._schemaelse()t=sql.Identifier(*schema_prefix,self._table+self.CONTENT_TABLE)c=sql.Identifier(self._table+self.CONTENT_TABLE+"_pk_doc_id")cursor.execute(sql.SQL(""" CREATE TABLE IF NOT EXISTS {t} ( doc_id UUID NOT NULL, text VARCHAR(60000) NOT NULL, metadata VARCHAR(1024) NOT NULL, CONSTRAINT {c} PRIMARY KEY (doc_id)) DISTRIBUTE ON (doc_id) SORT ON (doc_id) """).format(t=t,c=c,))schema_prefix=(self._schema,)ifself._schemaelse()t1=sql.Identifier(*schema_prefix,self._table)t2=sql.Identifier(*schema_prefix,self._table+self.CONTENT_TABLE)c1=sql.Identifier(self._table+self.CONTENT_TABLE+"_pk_doc_id_embedding_id")c2=sql.Identifier(self._table+self.CONTENT_TABLE+"_fk_doc_id")cursor.execute(sql.SQL(""" CREATE TABLE IF NOT EXISTS {t1} ( doc_id UUID NOT NULL, embedding_id SMALLINT NOT NULL, embedding FLOAT NOT NULL, CONSTRAINT {c1} PRIMARY KEY (doc_id, embedding_id), CONSTRAINT {c2} FOREIGN KEY (doc_id) REFERENCES {t2}(doc_id)) DISTRIBUTE ON (doc_id) SORT ON (doc_id) """).format(t1=t1,t2=t2,c1=c1,c2=c2,))
[docs]defdrop(self,table:str,schema:Optional[str]=None,cursor:Optional["PgCursor"]=None,)->None:""" Helper function: Drop data. If a cursor is provided, use it; otherwise, obtain a new cursor for the operation. """ifcursorisNone:withself.connection.get_cursor()ascursor:self._drop_table(cursor,table,schema=schema)else:self._drop_table(cursor,table,schema=schema)
def_drop_table(self,cursor:"PgCursor",table:str,schema:Optional[str]=None,)->None:""" Executes the drop table command using the given cursor. """frompsycopg2importsqlifschema:table_name=sql.Identifier(schema,table)else:table_name=sql.Identifier(table)drop_table_query=sql.SQL(""" DROP TABLE IF EXISTS {} CASCADE """).format(table_name)cursor.execute(drop_table_query)def_check_database_utf8(self)->bool:""" Helper function: Test the database is UTF-8 encoded """withself.connection.get_cursor()ascursor:query=""" SELECT pg_encoding_to_char(encoding) FROM pg_database WHERE datname = current_database(); """cursor.execute(query)encoding=cursor.fetchone()[0]ifencoding.lower()=="utf8"orencoding.lower()=="utf-8":returnTrueelse:raiseException("Database encoding is not UTF-8")returnFalse
def_copy_to_db(self,cursor:"PgCursor",content_io:StringIO,embeddings_io:StringIO)->None:content_io.seek(0)embeddings_io.seek(0)frompsycopg2importsqlschema_prefix=(self._schema,)ifself._schemaelse()table=sql.Identifier(*schema_prefix,self._table+self.CONTENT_TABLE)content_copy_query=sql.SQL(""" COPY {table} (doc_id, text, metadata) FROM STDIN WITH (FORMAT CSV, DELIMITER E'\\t', QUOTE '\"') """).format(table=table)cursor.copy_expert(content_copy_query,content_io)schema_prefix=(self._schema,)ifself._schemaelse()table=sql.Identifier(*schema_prefix,self._table)embeddings_copy_query=sql.SQL(""" COPY {table} (doc_id, embedding_id, embedding) FROM STDIN WITH (FORMAT CSV, DELIMITER E'\\t', QUOTE '\"') """).format(table=table)cursor.copy_expert(embeddings_copy_query,embeddings_io)
[docs]@classmethoddeffrom_texts(cls:Type[Yellowbrick],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,connection_string:str="",table:str="langchain",schema:str="public",drop:bool=False,**kwargs:Any,)->Yellowbrick:"""Add texts to the vectorstore index. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. connection_string: URI to Yellowbrick instance embedding: Embedding function table: table to store embeddings kwargs: vectorstore specific parameters """vss=cls(embedding=embedding,connection_string=connection_string,table=table,schema=schema,drop=drop,)vss.add_texts(texts=texts,metadatas=metadatas,**kwargs)returnvss
[docs]defdelete(self,ids:Optional[List[str]]=None,delete_all:Optional[bool]=None,**kwargs:Any,)->None:"""Delete vectors by uuids. Args: ids: List of ids to delete, where each id is a uuid string. """frompsycopg2importsqlifdelete_all:where_sql=sql.SQL(""" WHERE 1=1 """)elifidsisnotNone:uuids=tuple(sql.Literal(id)foridinids)ids_formatted=sql.SQL(", ").join(uuids)where_sql=sql.SQL(""" WHERE doc_id IN ({ids}) """).format(ids=ids_formatted,)else:raiseValueError("Either ids or delete_all must be provided.")schema_prefix=(self._schema,)ifself._schemaelse()withself.connection.get_cursor()ascursor:table_identifier=sql.Identifier(*schema_prefix,self._table+self.CONTENT_TABLE)query=sql.SQL("DELETE FROM {table}{where_sql}").format(table=table_identifier,where_sql=where_sql)cursor.execute(query)table_identifier=sql.Identifier(*schema_prefix,self._table)query=sql.SQL("DELETE FROM {table}{where_sql}").format(table=table_identifier,where_sql=where_sql)cursor.execute(query)ifself._table_exists(cursor,self._table+self.LSH_INDEX_TABLE,*schema_prefix):table_identifier=sql.Identifier(*schema_prefix,self._table+self.LSH_INDEX_TABLE)query=sql.SQL("DELETE FROM {table}{where_sql}").format(table=table_identifier,where_sql=where_sql)cursor.execute(query)returnNone
def_table_exists(self,cursor:"PgCursor",table_name:str,schema:str="public")->bool:""" Checks if a table exists in the given schema """frompsycopg2importsqlschema=sql.Literal(schema)table_name=sql.Literal(table_name)cursor.execute(sql.SQL(""" SELECT COUNT(*) FROM sys.table t INNER JOIN sys.schema s ON t.schema_id = s.schema_id WHERE s.name = {schema} AND t.name = {table_name} """).format(schema=schema,table_name=table_name,))returncursor.fetchone()[0]>0def_generate_vector_uuid(self,vector:List[float])->uuid.UUID:importhashlibvector_str=",".join(map(str,vector))hash_object=hashlib.sha1(vector_str.encode())hash_digest=hash_object.digest()vector_uuid=uuid.UUID(bytes=hash_digest[:16])returnvector_uuid
[docs]defsimilarity_search_with_score_by_vector(self,embedding:List[float],k:int=4,**kwargs:Any)->List[Tuple[Document,float]]:"""Perform a similarity search with Yellowbrick with vector Args: embedding (List[float]): query embedding k (int, optional): Top K neighbors to retrieve. Defaults to 4. NOTE: Please do not let end-user fill this and always be aware of SQL injection. Returns: List[Document, float]: List of Documents and scores """frompsycopg2importsqlfrompsycopg2.extrasimportexecute_valuesindex_params=kwargs.get("index_params")orYellowbrick.IndexParams()withself.connection.get_cursor()ascursor:tmp_embeddings_table="tmp_"+self._tabletmp_doc_id=self._generate_vector_uuid(embedding)create_table_query=sql.SQL(""" CREATE TEMPORARY TABLE {} ( doc_id UUID, embedding_id SMALLINT, embedding FLOAT) ON COMMIT DROP DISTRIBUTE REPLICATE """).format(sql.Identifier(tmp_embeddings_table))cursor.execute(create_table_query)data_input=[(str(tmp_doc_id),embedding_id,embedding_value)forembedding_id,embedding_valueinenumerate(embedding)]insert_query=sql.SQL("INSERT INTO {} (doc_id, embedding_id, embedding) VALUES %s").format(sql.Identifier(tmp_embeddings_table))execute_values(cursor,insert_query,data_input)v1=sql.Identifier(tmp_embeddings_table)schema_prefix=(self._schema,)ifself._schemaelse()v2=sql.Identifier(*schema_prefix,self._table)v3=sql.Identifier(*schema_prefix,self._table+self.CONTENT_TABLE)ifindex_params.index_type==Yellowbrick.IndexType.LSH:tmp_hash_table=self._table+"_tmp_hash"self._generate_tmp_lsh_hashes(cursor,tmp_embeddings_table,tmp_hash_table,)schema_prefix=(self._schema,)ifself._schemaelse()lsh_index=sql.Identifier(*schema_prefix,self._table+self.LSH_INDEX_TABLE)input_hash_table=sql.Identifier(tmp_hash_table)sql_query=sql.SQL(""" WITH index_docs AS ( SELECT t1.doc_id, SUM(ABS(t1.hash-t2.hash)) as hamming_distance FROM {lsh_index} t1 INNER JOIN {input_hash_table} t2 ON t1.hash_index = t2.hash_index GROUP BY t1.doc_id HAVING hamming_distance <= {hamming_distance} ) SELECT text, metadata, SUM(v1.embedding * v2.embedding) / (SQRT(SUM(v1.embedding * v1.embedding)) * SQRT(SUM(v2.embedding * v2.embedding))) AS score FROM {v1} v1 INNER JOIN {v2} v2 ON v1.embedding_id = v2.embedding_id INNER JOIN {v3} v3 ON v2.doc_id = v3.doc_id INNER JOIN index_docs v4 ON v2.doc_id = v4.doc_id GROUP BY v3.doc_id, v3.text, v3.metadata ORDER BY score DESC LIMIT %s """).format(v1=v1,v2=v2,v3=v3,lsh_index=lsh_index,input_hash_table=input_hash_table,hamming_distance=sql.Literal(index_params.get_param("hamming_distance",0)),)cursor.execute(sql_query,(k,),)results=cursor.fetchall()else:sql_query=sql.SQL(""" SELECT text, metadata, score FROM (SELECT v2.doc_id doc_id, SUM(v1.embedding * v2.embedding) / (SQRT(SUM(v1.embedding * v1.embedding)) * SQRT(SUM(v2.embedding * v2.embedding))) AS score FROM {v1} v1 INNER JOIN {v2} v2 ON v1.embedding_id = v2.embedding_id GROUP BY v2.doc_id ORDER BY score DESC LIMIT %s ) v4 INNER JOIN {v3} v3 ON v4.doc_id = v3.doc_id ORDER BY score DESC """).format(v1=v1,v2=v2,v3=v3,)cursor.execute(sql_query,(k,))results=cursor.fetchall()documents:List[Tuple[Document,float]]=[]forresultinresults:metadata=json.loads(result[1])or{}doc=Document(page_content=result[0],metadata=metadata)documents.append((doc,result[2]))returndocuments
[docs]defsimilarity_search(self,query:str,k:int=4,**kwargs:Any)->List[Document]:"""Perform a similarity search with Yellowbrick Args: query (str): query string k (int, optional): Top K neighbors to retrieve. Defaults to 4. NOTE: Please do not let end-user fill this and always be aware of SQL injection. Returns: List[Document]: List of Documents """embedding=self._embedding.embed_query(query)documents=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,**kwargs)return[docfordoc,_indocuments]
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,**kwargs:Any)->List[Tuple[Document,float]]:"""Perform a similarity search with Yellowbrick Args: query (str): query string k (int, optional): Top K neighbors to retrieve. Defaults to 4. NOTE: Please do not let end-user fill this and always be aware of SQL injection. Returns: List[Document]: List of (Document, similarity) """embedding=self._embedding.embed_query(query)documents=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,**kwargs)returndocuments
[docs]defsimilarity_search_by_vector(self,embedding:List[float],k:int=4,**kwargs:Any)->List[Document]:"""Perform a similarity search with Yellowbrick by vectors Args: embedding (List[float]): query embedding k (int, optional): Top K neighbors to retrieve. Defaults to 4. NOTE: Please do not let end-user fill this and always be aware of SQL injection. Returns: List[Document]: List of documents """documents=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,**kwargs)return[docfordoc,_indocuments]
def_update_lsh_hashes(self,cursor:"PgCursor",doc_id:Optional[uuid.UUID]=None,)->None:"""Add hashes to LSH index"""frompsycopg2importsqlschema_prefix=(self._schema,)ifself._schemaelse()lsh_hyperplane_table=sql.Identifier(*schema_prefix,self._table+self.LSH_HYPERPLANE_TABLE)lsh_index_table_id=sql.Identifier(*schema_prefix,self._table+self.LSH_INDEX_TABLE)embedding_table_id=sql.Identifier(*schema_prefix,self._table)query_prefix_id=sql.SQL("INSERT INTO {}").format(lsh_index_table_id)condition=(sql.SQL("WHERE e.doc_id = {doc_id}").format(doc_id=sql.Literal(str(doc_id)))ifdoc_idelsesql.SQL(""))group_by=sql.SQL("GROUP BY 1, 2")input_query=sql.SQL(""" {query_prefix} SELECT e.doc_id as doc_id, h.id as hash_index, CASE WHEN SUM(e.embedding * h.hyperplane) > 0 THEN 1 ELSE 0 END as hash FROM {embedding_table} e INNER JOIN {hyperplanes} h ON e.embedding_id = h.hyperplane_id {condition} {group_by} """).format(query_prefix=query_prefix_id,embedding_table=embedding_table_id,hyperplanes=lsh_hyperplane_table,condition=condition,group_by=group_by,)cursor.execute(input_query)def_generate_tmp_lsh_hashes(self,cursor:"PgCursor",tmp_embedding_table:str,tmp_hash_table:str)->None:"""Generate temp LSH"""frompsycopg2importsqlschema_prefix=(self._schema,)ifself._schemaelse()lsh_hyperplane_table=sql.Identifier(*schema_prefix,self._table+self.LSH_HYPERPLANE_TABLE)tmp_embedding_table_id=sql.Identifier(tmp_embedding_table)tmp_hash_table_id=sql.Identifier(tmp_hash_table)query_prefix=sql.SQL("CREATE TEMPORARY TABLE {} ON COMMIT DROP AS").format(tmp_hash_table_id)group_by=sql.SQL("GROUP BY 1")input_query=sql.SQL(""" {query_prefix} SELECT h.id as hash_index, CASE WHEN SUM(e.embedding * h.hyperplane) > 0 THEN 1 ELSE 0 END as hash FROM {embedding_table} e INNER JOIN {hyperplanes} h ON e.embedding_id = h.hyperplane_id {group_by} DISTRIBUTE REPLICATE """).format(query_prefix=query_prefix,embedding_table=tmp_embedding_table_id,hyperplanes=lsh_hyperplane_table,group_by=group_by,)cursor.execute(input_query)def_populate_hyperplanes(self,cursor:"PgCursor",num_hyperplanes:int)->None:"""Generate random hyperplanes and store in Yellowbrick"""frompsycopg2importsqlschema_prefix=(self._schema,)ifself._schemaelse()hyperplanes_table=sql.Identifier(*schema_prefix,self._table+self.LSH_HYPERPLANE_TABLE)cursor.execute(sql.SQL("SELECT COUNT(*) FROM {t}").format(t=hyperplanes_table))ifcursor.fetchone()[0]>0:returnt=sql.Identifier(*schema_prefix,self._table)cursor.execute(sql.SQL("SELECT MAX(embedding_id) FROM {t}").format(t=t))num_dimensions=cursor.fetchone()[0]num_dimensions+=1insert_query=sql.SQL(""" WITH parameters AS ( SELECT {num_hyperplanes} AS num_hyperplanes, {dims_per_hyperplane} AS dims_per_hyperplane ) INSERT INTO {hyperplanes_table} (id, hyperplane_id, hyperplane) SELECT id, hyperplane_id, (random() * 2 - 1) AS hyperplane FROM (SELECT range-1 id FROM sys.rowgenerator WHERE range BETWEEN 1 AND (SELECT num_hyperplanes FROM parameters) AND worker_lid = 0 AND thread_id = 0) a, (SELECT range-1 hyperplane_id FROM sys.rowgenerator WHERE range BETWEEN 1 AND (SELECT dims_per_hyperplane FROM parameters) AND worker_lid = 0 AND thread_id = 0) b """).format(num_hyperplanes=sql.Literal(num_hyperplanes),dims_per_hyperplane=sql.Literal(num_dimensions),hyperplanes_table=hyperplanes_table,)cursor.execute(insert_query)def_create_lsh_index_tables(self,cursor:"PgCursor")->None:"""Create LSH index and hyperplane tables"""frompsycopg2importsqlschema_prefix=(self._schema,)ifself._schemaelse()t1=sql.Identifier(*schema_prefix,self._table+self.LSH_INDEX_TABLE)t2=sql.Identifier(*schema_prefix,self._table+self.CONTENT_TABLE)c1=sql.Identifier(self._table+self.LSH_INDEX_TABLE+"_pk_doc_id")c2=sql.Identifier(self._table+self.LSH_INDEX_TABLE+"_fk_doc_id")cursor.execute(sql.SQL(""" CREATE TABLE IF NOT EXISTS {t1} ( doc_id UUID NOT NULL, hash_index SMALLINT NOT NULL, hash SMALLINT NOT NULL, CONSTRAINT {c1} PRIMARY KEY (doc_id, hash_index), CONSTRAINT {c2} FOREIGN KEY (doc_id) REFERENCES {t2}(doc_id)) DISTRIBUTE ON (doc_id) SORT ON (doc_id) """).format(t1=t1,t2=t2,c1=c1,c2=c2,))schema_prefix=(self._schema,)ifself._schemaelse()t=sql.Identifier(*schema_prefix,self._table+self.LSH_HYPERPLANE_TABLE)c=sql.Identifier(self._table+self.LSH_HYPERPLANE_TABLE+"_pk_id_hp_id")cursor.execute(sql.SQL(""" CREATE TABLE IF NOT EXISTS {t} ( id SMALLINT NOT NULL, hyperplane_id SMALLINT NOT NULL, hyperplane FLOAT NOT NULL, CONSTRAINT {c} PRIMARY KEY (id, hyperplane_id)) DISTRIBUTE REPLICATE SORT ON (id) """).format(t=t,c=c,))def_drop_lsh_index_tables(self,cursor:"PgCursor")->None:"""Drop LSH index tables"""self.drop(schema=self._schema,table=self._table+self.LSH_INDEX_TABLE,cursor=cursor)self.drop(schema=self._schema,table=self._table+self.LSH_HYPERPLANE_TABLE,cursor=cursor,)
[docs]defcreate_index(self,index_params:Yellowbrick.IndexParams)->None:"""Create index from existing vectors"""ifindex_params.index_type==Yellowbrick.IndexType.LSH:withself.connection.get_cursor()ascursor:self._drop_lsh_index_tables(cursor)self._create_lsh_index_tables(cursor)self._populate_hyperplanes(cursor,index_params.get_param("num_hyperplanes",128))self._update_lsh_hashes(cursor)
[docs]defdrop_index(self,index_params:Yellowbrick.IndexParams)->None:"""Drop an index"""ifindex_params.index_type==Yellowbrick.IndexType.LSH:withself.connection.get_cursor()ascursor:self._drop_lsh_index_tables(cursor)
def_update_index(self,index_params:Yellowbrick.IndexParams,doc_id:uuid.UUID)->None:"""Update an index with a new or modified embedding in the embeddings table"""ifindex_params.index_type==Yellowbrick.IndexType.LSH:withself.connection.get_cursor()ascursor:self._update_lsh_hashes(cursor,doc_id)
[docs]defmigrate_schema_v1_to_v2(self)->None:frompsycopg2importsqltry:withself.connection.get_cursor()ascursor:schema_prefix=(self._schema,)ifself._schemaelse()embeddings=sql.Identifier(*schema_prefix,self._table)old_embeddings=sql.Identifier(*schema_prefix,self._table+"_v1")content=sql.Identifier(*schema_prefix,self._table+self.CONTENT_TABLE)alter_table_query=sql.SQL("ALTER TABLE {t1} RENAME TO {t2}").format(t1=embeddings,t2=old_embeddings,)cursor.execute(alter_table_query)self._create_table(cursor)insert_query=sql.SQL(""" INSERT INTO {t1} (doc_id, embedding_id, embedding) SELECT id, embedding_id, embedding FROM {t2} """).format(t1=embeddings,t2=old_embeddings,)cursor.execute(insert_query)insert_content_query=sql.SQL(""" INSERT INTO {t1} (doc_id, text, metadata) SELECT DISTINCT id, text, metadata FROM {t2} """).format(t1=content,t2=old_embeddings)cursor.execute(insert_content_query)exceptExceptionase:raiseRuntimeError(f"Failed to migrate schema: {e}")frome