Source code for langchain_postgres.v2.async_vectorstore
# TODO: Remove below import when minimum supported Python version is 3.10from__future__importannotationsimportcopyimportjsonimportuuidfromtypingimportAny,Callable,Iterable,Optional,Sequenceimportnumpyasnpfromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.vectorstoresimportVectorStore,utilsfromsqlalchemyimportRowMapping,textfromsqlalchemy.ext.asyncioimportAsyncEnginefrom.engineimportPGEnginefrom.indexesimport(DEFAULT_DISTANCE_STRATEGY,DEFAULT_INDEX_NAME_SUFFIX,BaseIndex,DistanceStrategy,ExactNearestNeighbor,QueryOptions,)COMPARISONS_TO_NATIVE={"$eq":"=","$ne":"!=","$lt":"<","$lte":"<=","$gt":">","$gte":">=",}SPECIAL_CASED_OPERATORS={"$in","$nin","$between","$exists",}TEXT_OPERATORS={"$like","$ilike",}LOGICAL_OPERATORS={"$and","$or","$not"}SUPPORTED_OPERATORS=(set(COMPARISONS_TO_NATIVE).union(TEXT_OPERATORS).union(LOGICAL_OPERATORS).union(SPECIAL_CASED_OPERATORS))
[docs]classAsyncPGVectorStore(VectorStore):"""Postgres Vector Store class"""__create_key=object()
[docs]def__init__(self,key:object,engine:AsyncEngine,embedding_service:Embeddings,table_name:str,*,schema_name:str="public",content_column:str="content",embedding_column:str="embedding",metadata_columns:Optional[list[str]]=None,id_column:str="langchain_id",metadata_json_column:Optional[str]="langchain_metadata",distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,index_query_options:Optional[QueryOptions]=None,):"""AsyncPGVectorStore constructor. Args: key (object): Prevent direct constructor usage. engine (PGEngine): Connection pool engine for managing connections to postgres database. embedding_service (Embeddings): Text embedding model to use. table_name (str): Name of the existing table or the table to be created. schema_name (str, optional): Name of the database schema. Defaults to "public". content_column (str): Column that represent a Document's page_content. Defaults to "content". embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". metadata_columns (list[str]): Column(s) that represent a document's metadata. id_column (str): Column that represents the Document's id. Defaults to "langchain_id". metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. k (int): Number of Documents to return from search. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. index_query_options (QueryOptions): Index query option. Raises: Exception: If called directly by user. """ifkey!=AsyncPGVectorStore.__create_key:raiseException("Only create class through 'create' or 'create_sync' methods!")self.engine=engineself.embedding_service=embedding_serviceself.table_name=table_nameself.schema_name=schema_nameself.content_column=content_columnself.embedding_column=embedding_columnself.metadata_columns=metadata_columnsifmetadata_columnsisnotNoneelse[]self.id_column=id_columnself.metadata_json_column=metadata_json_columnself.distance_strategy=distance_strategyself.k=kself.fetch_k=fetch_kself.lambda_mult=lambda_multself.index_query_options=index_query_options
[docs]@classmethodasyncdefcreate(cls:type[AsyncPGVectorStore],engine:PGEngine,embedding_service:Embeddings,table_name:str,*,schema_name:str="public",content_column:str="content",embedding_column:str="embedding",metadata_columns:Optional[list[str]]=None,ignore_metadata_columns:Optional[list[str]]=None,id_column:str="langchain_id",metadata_json_column:Optional[str]="langchain_metadata",distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,index_query_options:Optional[QueryOptions]=None,)->AsyncPGVectorStore:"""Create an AsyncPGVectorStore instance. Args: engine (PGEngine): Connection pool engine for managing connections to postgres database. embedding_service (Embeddings): Text embedding model to use. table_name (str): Name of an existing table. schema_name (str, optional): Name of the database schema. Defaults to "public". content_column (str): Column that represent a Document's page_content. Defaults to "content". embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". metadata_columns (list[str]): Column(s) that represent a document's metadata. ignore_metadata_columns (list[str]): Column(s) to ignore in pre-existing tables for a document's metadata. Can not be used with metadata_columns. Defaults to None. id_column (str): Column that represents the Document's id. Defaults to "langchain_id". metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. k (int): Number of Documents to return from search. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. index_query_options (QueryOptions): Index query option. Returns: AsyncPGVectorStore """ifmetadata_columnsisNone:metadata_columns=[]ifmetadata_columnsandignore_metadata_columns:raiseValueError("Can not use both metadata_columns and ignore_metadata_columns.")# Get field type informationstmt="SELECT column_name, data_type FROM information_schema.columns WHERE table_name = :table_name AND table_schema = :schema_name"asyncwithengine._pool.connect()asconn:result=awaitconn.execute(text(stmt),{"table_name":table_name,"schema_name":schema_name},)result_map=result.mappings()results=result_map.fetchall()columns={}forfieldinresults:columns[field["column_name"]]=field["data_type"]# Check columnsifid_columnnotincolumns:raiseValueError(f"Id column, {id_column}, does not exist.")ifcontent_columnnotincolumns:raiseValueError(f"Content column, {content_column}, does not exist.")content_type=columns[content_column]ifcontent_type!="text"and"char"notincontent_type:raiseValueError(f"Content column, {content_column}, is type, {content_type}. It must be a type of character string.")ifembedding_columnnotincolumns:raiseValueError(f"Embedding column, {embedding_column}, does not exist.")ifcolumns[embedding_column]!="USER-DEFINED":raiseValueError(f"Embedding column, {embedding_column}, is not type Vector.")metadata_json_column=(Noneifmetadata_json_columnnotincolumnselsemetadata_json_column)# If using metadata_columns check to make sure column existsforcolumninmetadata_columns:ifcolumnnotincolumns:raiseValueError(f"Metadata column, {column}, does not exist.")# If using ignore_metadata_columns, filter out known columns and set known metadata columnsall_columns=columnsifignore_metadata_columns:forcolumninignore_metadata_columns:delall_columns[column]delall_columns[id_column]delall_columns[content_column]delall_columns[embedding_column]metadata_columns=[kforkinall_columns.keys()]returncls(cls.__create_key,engine._pool,embedding_service,table_name,schema_name=schema_name,content_column=content_column,embedding_column=embedding_column,metadata_columns=metadata_columns,id_column=id_column,metadata_json_column=metadata_json_column,distance_strategy=distance_strategy,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,index_query_options=index_query_options,)
[docs]asyncdefaadd_embeddings(self,texts:Iterable[str],embeddings:list[list[float]],metadatas:Optional[list[dict]]=None,ids:Optional[list]=None,**kwargs:Any,)->list[str]:"""Add data along with embeddings to the table. Raises: :class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`. """ifnotids:ids=[str(uuid.uuid4())for_intexts]else:# This is done to fill in any missing idsids=[idifidisnotNoneelsestr(uuid.uuid4())foridinids]ifnotmetadatas:metadatas=[{}for_intexts]# Check for inline embedding capabilityinline_embed_func=getattr(self.embedding_service,"embed_query_inline",None)can_inline_embed=callable(inline_embed_func)# Insert embeddingsforid,content,embedding,metadatainzip(ids,texts,embeddings,metadatas):metadata_col_names=(", "+", ".join(f'"{col}"'forcolinself.metadata_columns)iflen(self.metadata_columns)>0else"")insert_stmt=f'INSERT INTO "{self.schema_name}"."{self.table_name}"("{self.id_column}", "{self.content_column}", "{self.embedding_column}"{metadata_col_names}'values={"id":id,"content":content,"embedding":str([float(dimension)fordimensioninembedding]),}values_stmt="VALUES (:id, :content, :embedding"ifnotembeddingandcan_inline_embed:values_stmt=f"VALUES (:id, :content, {self.embedding_service.embed_query_inline(content)}"# type: ignore# Add metadataextra=copy.deepcopy(metadata)formetadata_columninself.metadata_columns:ifmetadata_columninmetadata:values_stmt+=f", :{metadata_column}"values[metadata_column]=metadata[metadata_column]delextra[metadata_column]else:values_stmt+=",null"# Add JSON column and/or close statementinsert_stmt+=(f""", "{self.metadata_json_column}")"""ifself.metadata_json_columnelse")")ifself.metadata_json_column:values_stmt+=", :extra)"values["extra"]=json.dumps(extra)else:values_stmt+=")"upsert_stmt=f' ON CONFLICT ("{self.id_column}") DO UPDATE SET "{self.content_column}" = EXCLUDED."{self.content_column}", "{self.embedding_column}" = EXCLUDED."{self.embedding_column}"'ifself.metadata_json_column:upsert_stmt+=f', "{self.metadata_json_column}" = EXCLUDED."{self.metadata_json_column}"'forcolumninself.metadata_columns:upsert_stmt+=f', "{column}" = EXCLUDED."{column}"'upsert_stmt+=";"query=insert_stmt+values_stmt+upsert_stmtasyncwithself.engine.connect()asconn:awaitconn.execute(text(query),values)awaitconn.commit()returnids
[docs]asyncdefaadd_texts(self,texts:Iterable[str],metadatas:Optional[list[dict]]=None,ids:Optional[list]=None,**kwargs:Any,)->list[str]:"""Embed texts and add to the table. Raises: :class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`. """# Check for inline embedding queryinline_embed_func=getattr(self.embedding_service,"embed_query_inline",None)ifcallable(inline_embed_func):embeddings:list[list[float]]=[[]for_inlist(texts)]else:embeddings=awaitself.embedding_service.aembed_documents(list(texts))ids=awaitself.aadd_embeddings(texts,embeddings,metadatas=metadatas,ids=ids,**kwargs)returnids
[docs]asyncdefaadd_documents(self,documents:list[Document],ids:Optional[list]=None,**kwargs:Any,)->list[str]:"""Embed documents and add to the table. Raises: :class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`. """texts=[doc.page_contentfordocindocuments]metadatas=[doc.metadatafordocindocuments]ifnotids:ids=[doc.idfordocindocuments]ids=awaitself.aadd_texts(texts,metadatas=metadatas,ids=ids,**kwargs)returnids
[docs]asyncdefadelete(self,ids:Optional[list]=None,**kwargs:Any,)->Optional[bool]:"""Delete records from the table. Raises: :class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`. """ifnotids:returnFalseplaceholders=", ".join(f":id_{i}"foriinrange(len(ids)))param_dict={f"id_{i}":idfori,idinenumerate(ids)}query=f'DELETE FROM "{self.schema_name}"."{self.table_name}" WHERE {self.id_column} in ({placeholders})'asyncwithself.engine.connect()asconn:awaitconn.execute(text(query),param_dict)awaitconn.commit()returnTrue
[docs]@classmethodasyncdefafrom_texts(# type: ignore[override]cls:type[AsyncPGVectorStore],texts:list[str],embedding:Embeddings,engine:PGEngine,table_name:str,*,schema_name:str="public",metadatas:Optional[list[dict]]=None,ids:Optional[list]=None,content_column:str="content",embedding_column:str="embedding",metadata_columns:Optional[list[str]]=None,ignore_metadata_columns:Optional[list[str]]=None,id_column:str="langchain_id",metadata_json_column:str="langchain_metadata",distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,index_query_options:Optional[QueryOptions]=None,**kwargs:Any,)->AsyncPGVectorStore:"""Create an AsyncPGVectorStore instance from texts. Args: texts (list[str]): Texts to add to the vector store. embedding (Embeddings): Text embedding model to use. engine (PGEngine): Connection pool engine for managing connections to postgres database. table_name (str): Name of an existing table. metadatas (Optional[list[dict]]): List of metadatas to add to table records. ids: (Optional[list[str]]): List of IDs to add to table records. content_column (str): Column that represent a Document's page_content. Defaults to "content". embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". metadata_columns (list[str]): Column(s) that represent a document's metadata. ignore_metadata_columns (list[str]): Column(s) to ignore in pre-existing tables for a document's metadata. Can not be used with metadata_columns. Defaults to None. id_column (str): Column that represents the Document's id. Defaults to "langchain_id". metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. k (int): Number of Documents to return from search. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. index_query_options (QueryOptions): Index query option. Raises: :class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`. Returns: AsyncPGVectorStore """vs=awaitcls.create(engine,embedding,table_name,schema_name=schema_name,content_column=content_column,embedding_column=embedding_column,metadata_columns=metadata_columns,ignore_metadata_columns=ignore_metadata_columns,id_column=id_column,metadata_json_column=metadata_json_column,distance_strategy=distance_strategy,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,index_query_options=index_query_options,)awaitvs.aadd_texts(texts,metadatas=metadatas,ids=ids,**kwargs)returnvs
[docs]@classmethodasyncdefafrom_documents(# type: ignore[override]cls:type[AsyncPGVectorStore],documents:list[Document],embedding:Embeddings,engine:PGEngine,table_name:str,*,schema_name:str="public",ids:Optional[list]=None,content_column:str="content",embedding_column:str="embedding",metadata_columns:Optional[list[str]]=None,ignore_metadata_columns:Optional[list[str]]=None,id_column:str="langchain_id",metadata_json_column:str="langchain_metadata",distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,index_query_options:Optional[QueryOptions]=None,**kwargs:Any,)->AsyncPGVectorStore:"""Create an AsyncPGVectorStore instance from documents. Args: documents (list[Document]): Documents to add to the vector store. embedding (Embeddings): Text embedding model to use. engine (PGEngine): Connection pool engine for managing connections to postgres database. table_name (str): Name of an existing table. metadatas (Optional[list[dict]]): List of metadatas to add to table records. ids: (Optional[list[str]]): List of IDs to add to table records. content_column (str): Column that represent a Document's page_content. Defaults to "content". embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". metadata_columns (list[str]): Column(s) that represent a document's metadata. ignore_metadata_columns (list[str]): Column(s) to ignore in pre-existing tables for a document's metadata. Can not be used with metadata_columns. Defaults to None. id_column (str): Column that represents the Document's id. Defaults to "langchain_id". metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. k (int): Number of Documents to return from search. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. index_query_options (QueryOptions): Index query option. Raises: :class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`. Returns: AsyncPGVectorStore """vs=awaitcls.create(engine,embedding,table_name,schema_name=schema_name,content_column=content_column,embedding_column=embedding_column,metadata_columns=metadata_columns,ignore_metadata_columns=ignore_metadata_columns,id_column=id_column,metadata_json_column=metadata_json_column,distance_strategy=distance_strategy,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,index_query_options=index_query_options,)texts=[doc.page_contentfordocindocuments]metadatas=[doc.metadatafordocindocuments]awaitvs.aadd_texts(texts,metadatas=metadatas,ids=ids,**kwargs)returnvs
asyncdef__query_collection(self,embedding:list[float],*,k:Optional[int]=None,filter:Optional[dict]=None,**kwargs:Any,)->Sequence[RowMapping]:"""Perform similarity search query on database."""k=kifkelseself.koperator=self.distance_strategy.operatorsearch_function=self.distance_strategy.search_functioncolumns=self.metadata_columns+[self.id_column,self.content_column,self.embedding_column,]ifself.metadata_json_column:columns.append(self.metadata_json_column)column_names=", ".join(f'"{col}"'forcolincolumns)safe_filter=Nonefilter_dict=Noneiffilterandisinstance(filter,dict):safe_filter,filter_dict=self._create_filter_clause(filter)param_filter=f"WHERE {safe_filter}"ifsafe_filterelse""inline_embed_func=getattr(self.embedding_service,"embed_query_inline",None)ifnotembeddingandcallable(inline_embed_func)and"query"inkwargs:query_embedding=self.embedding_service.embed_query_inline(kwargs["query"])# type: ignoreembedding_data_string=f"{query_embedding}"else:query_embedding=f"{[float(dimension)fordimensioninembedding]}"embedding_data_string=":query_embedding"stmt=f"""SELECT {column_names}, {search_function}("{self.embedding_column}", {embedding_data_string}) as distance FROM "{self.schema_name}"."{self.table_name}" {param_filter} ORDER BY "{self.embedding_column}" {operator}{embedding_data_string} LIMIT :k; """param_dict={"query_embedding":query_embedding,"k":k}iffilter_dict:param_dict.update(filter_dict)ifself.index_query_options:asyncwithself.engine.connect()asconn:# Set each query option individuallyforquery_optioninself.index_query_options.to_parameter():query_options_stmt=f"SET LOCAL {query_option};"awaitconn.execute(text(query_options_stmt))result=awaitconn.execute(text(stmt),param_dict)result_map=result.mappings()results=result_map.fetchall()else:asyncwithself.engine.connect()asconn:result=awaitconn.execute(text(stmt),param_dict)result_map=result.mappings()results=result_map.fetchall()returnresults
[docs]asyncdefasimilarity_search(self,query:str,k:Optional[int]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[Document]:"""Return docs selected by similarity search on query."""inline_embed_func=getattr(self.embedding_service,"embed_query_inline",None)embedding=([]ifcallable(inline_embed_func)elseawaitself.embedding_service.aembed_query(text=query))kwargs["query"]=queryreturnawaitself.asimilarity_search_by_vector(embedding=embedding,k=k,filter=filter,**kwargs)
def_select_relevance_score_fn(self)->Callable[[float],float]:"""Select a relevance function based on distance strategy."""# Calculate distance strategy provided in# vectorstore constructorifself.distance_strategy==DistanceStrategy.COSINE_DISTANCE:returnself._cosine_relevance_score_fnifself.distance_strategy==DistanceStrategy.INNER_PRODUCT:returnself._max_inner_product_relevance_score_fnelifself.distance_strategy==DistanceStrategy.EUCLIDEAN:returnself._euclidean_relevance_score_fn
[docs]asyncdefasimilarity_search_with_score(self,query:str,k:Optional[int]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[tuple[Document,float]]:"""Return docs and distance scores selected by similarity search on query."""inline_embed_func=getattr(self.embedding_service,"embed_query_inline",None)embedding=([]ifcallable(inline_embed_func)elseawaitself.embedding_service.aembed_query(text=query))kwargs["query"]=querydocs=awaitself.asimilarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter,**kwargs)returndocs
[docs]asyncdefasimilarity_search_by_vector(self,embedding:list[float],k:Optional[int]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[Document]:"""Return docs selected by vector similarity search."""docs_and_scores=awaitself.asimilarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter,**kwargs)return[docfordoc,_indocs_and_scores]
[docs]asyncdefasimilarity_search_with_score_by_vector(self,embedding:list[float],k:Optional[int]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[tuple[Document,float]]:"""Return docs and distance scores selected by vector similarity search."""results=awaitself.__query_collection(embedding=embedding,k=k,filter=filter,**kwargs)documents_with_scores=[]forrowinresults:metadata=(row[self.metadata_json_column]ifself.metadata_json_columnandrow[self.metadata_json_column]else{})forcolinself.metadata_columns:metadata[col]=row[col]documents_with_scores.append((Document(page_content=row[self.content_column],metadata=metadata,id=str(row[self.id_column]),),row["distance"],))returndocuments_with_scores
[docs]asyncdefamax_marginal_relevance_search(self,query:str,k:Optional[int]=None,fetch_k:Optional[int]=None,lambda_mult:Optional[float]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance."""embedding=awaitself.embedding_service.aembed_query(text=query)returnawaitself.amax_marginal_relevance_search_by_vector(embedding=embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)
[docs]asyncdefamax_marginal_relevance_search_by_vector(self,embedding:list[float],k:Optional[int]=None,fetch_k:Optional[int]=None,lambda_mult:Optional[float]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[Document]:"""Return docs selected using the maximal marginal relevance."""docs_and_scores=(awaitself.amax_marginal_relevance_search_with_score_by_vector(embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,))return[result[0]forresultindocs_and_scores]
[docs]asyncdefamax_marginal_relevance_search_with_score_by_vector(self,embedding:list[float],k:Optional[int]=None,fetch_k:Optional[int]=None,lambda_mult:Optional[float]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[tuple[Document,float]]:"""Return docs and distance scores selected using the maximal marginal relevance."""results=awaitself.__query_collection(embedding=embedding,k=fetch_k,filter=filter,**kwargs)k=kifkelseself.kfetch_k=fetch_kiffetch_kelseself.fetch_klambda_mult=lambda_multiflambda_multelseself.lambda_multembedding_list=[json.loads(row[self.embedding_column])forrowinresults]mmr_selected=utils.maximal_marginal_relevance(np.array(embedding,dtype=np.float32),embedding_list,k=k,lambda_mult=lambda_mult,)documents_with_scores=[]forrowinresults:metadata=(row[self.metadata_json_column]ifself.metadata_json_columnandrow[self.metadata_json_column]else{})forcolinself.metadata_columns:metadata[col]=row[col]documents_with_scores.append((Document(page_content=row[self.content_column],metadata=metadata,id=str(row[self.id_column]),),row["distance"],))return[rfori,rinenumerate(documents_with_scores)ifiinmmr_selected]
[docs]asyncdefaapply_vector_index(self,index:BaseIndex,name:Optional[str]=None,*,concurrently:bool=False,)->None:"""Create index in the vector store table."""ifisinstance(index,ExactNearestNeighbor):awaitself.adrop_vector_index()return# if extension name is mentioned, create the extensionifindex.extension_name:asyncwithself.engine.connect()asconn:awaitconn.execute(text(f"CREATE EXTENSION IF NOT EXISTS {index.extension_name}"))awaitconn.commit()function=index.get_index_function()filter=f"WHERE ({index.partial_indexes})"ifindex.partial_indexeselse""params="WITH "+index.index_options()ifnameisNone:ifindex.nameisNone:index.name=self.table_name+DEFAULT_INDEX_NAME_SUFFIXname=index.namestmt=f'CREATE INDEX {"CONCURRENTLY"ifconcurrentlyelse""} "{name}" ON "{self.schema_name}"."{self.table_name}" USING {index.index_type} ({self.embedding_column}{function}) {params}{filter};'ifconcurrently:asyncwithself.engine.connect()asconn:autocommit_conn=awaitconn.execution_options(isolation_level="AUTOCOMMIT")awaitautocommit_conn.execute(text(stmt))else:asyncwithself.engine.connect()asconn:awaitconn.execute(text(stmt))awaitconn.commit()
[docs]asyncdefareindex(self,index_name:Optional[str]=None)->None:"""Re-index the vector store table."""index_name=index_nameorself.table_name+DEFAULT_INDEX_NAME_SUFFIXquery=f'REINDEX INDEX "{index_name}";'asyncwithself.engine.connect()asconn:awaitconn.execute(text(query))awaitconn.commit()
[docs]asyncdefadrop_vector_index(self,index_name:Optional[str]=None,)->None:"""Drop the vector index."""index_name=index_nameorself.table_name+DEFAULT_INDEX_NAME_SUFFIXquery=f'DROP INDEX IF EXISTS "{index_name}";'asyncwithself.engine.connect()asconn:awaitconn.execute(text(query))awaitconn.commit()
[docs]asyncdefis_valid_index(self,index_name:Optional[str]=None,)->bool:"""Check if index exists in the table."""index_name=index_nameorself.table_name+DEFAULT_INDEX_NAME_SUFFIXquery=""" SELECT tablename, indexname FROM pg_indexes WHERE tablename = :table_name AND schemaname = :schema_name AND indexname = :index_name; """param_dict={"table_name":self.table_name,"schema_name":self.schema_name,"index_name":index_name,}asyncwithself.engine.connect()asconn:result=awaitconn.execute(text(query),param_dict)result_map=result.mappings()results=result_map.fetchall()returnbool(len(results)==1)
[docs]asyncdefaget_by_ids(self,ids:Sequence[str])->list[Document]:"""Get documents by ids."""columns=self.metadata_columns+[self.id_column,self.content_column,]ifself.metadata_json_column:columns.append(self.metadata_json_column)column_names=", ".join(f'"{col}"'forcolincolumns)placeholders=", ".join(f":id_{i}"foriinrange(len(ids)))param_dict={f"id_{i}":idfori,idinenumerate(ids)}query=f'SELECT {column_names} FROM "{self.schema_name}"."{self.table_name}" WHERE "{self.id_column}" IN ({placeholders});'asyncwithself.engine.connect()asconn:result=awaitconn.execute(text(query),param_dict)result_map=result.mappings()results=result_map.fetchall()documents=[]forrowinresults:metadata=(row[self.metadata_json_column]ifself.metadata_json_columnandrow[self.metadata_json_column]else{})forcolinself.metadata_columns:metadata[col]=row[col]documents.append((Document(page_content=row[self.content_column],metadata=metadata,id=str(row[self.id_column]),)))returndocuments
def_handle_field_filter(self,*,field:str,value:Any,)->tuple[str,dict]:"""Create a filter for a specific field. Args: field: name of field value: value to filter If provided as is then this will be an equality filter If provided as a dictionary then this will be a filter, the key will be the operator and the value will be the value to filter by Returns: sql where query as a string """ifnotisinstance(field,str):raiseValueError(f"field should be a string but got: {type(field)} with value: {field}")iffield.startswith("$"):raiseValueError(f"Invalid filter condition. Expected a field but got an operator: "f"{field}")# Allow [a-zA-Z0-9_], disallow $ for now until we support escape charactersifnotfield.isidentifier():raiseValueError(f"Invalid field name: {field}. Expected a valid identifier.")ifisinstance(value,dict):# This is a filter specificationiflen(value)!=1:raiseValueError("Invalid filter condition. Expected a value which ""is a dictionary with a single key that corresponds to an operator "f"but got a dictionary with {len(value)} keys. The first few "f"keys are: {list(value.keys())[:3]}")operator,filter_value=list(value.items())[0]# Verify that that operator is an operatorifoperatornotinSUPPORTED_OPERATORS:raiseValueError(f"Invalid operator: {operator}. "f"Expected one of {SUPPORTED_OPERATORS}")else:# Then we assume an equality operatoroperator="$eq"filter_value=valueifoperatorinCOMPARISONS_TO_NATIVE:# Then we implement an equality filter# native is trusted inputnative=COMPARISONS_TO_NATIVE[operator]id=str(uuid.uuid4()).split("-")[0]returnf"{field}{native} :{field}_{id}",{f"{field}_{id}":filter_value}elifoperator=="$between":# Use AND with two comparisonslow,high=filter_valuereturnf"({field} BETWEEN :{field}_low AND :{field}_high)",{f"{field}_low":low,f"{field}_high":high,}elifoperatorin{"$in","$nin","$like","$ilike"}:# We'll do force coercion to textifoperatorin{"$in","$nin"}:forvalinfilter_value:ifnotisinstance(val,(str,int,float)):raiseNotImplementedError(f"Unsupported type: {type(val)} for value: {val}")ifisinstance(val,bool):# b/c bool is an instance of intraiseNotImplementedError(f"Unsupported type: {type(val)} for value: {val}")ifoperatorin{"$in"}:returnf"{field} = ANY(:{field}_in)",{f"{field}_in":filter_value}elifoperatorin{"$nin"}:returnf"{field} <> ALL (:{field}_nin)",{f"{field}_nin":filter_value}elifoperatorin{"$like"}:returnf"({field} LIKE :{field}_like)",{f"{field}_like":filter_value}elifoperatorin{"$ilike"}:returnf"({field} ILIKE :{field}_ilike)",{f"{field}_ilike":filter_value}else:raiseNotImplementedError()elifoperator=="$exists":ifnotisinstance(filter_value,bool):raiseValueError("Expected a boolean value for $exists "f"operator, but got: {filter_value}")else:iffilter_value:returnf"({field} IS NOT NULL)",{}else:returnf"({field} IS NULL)",{}else:raiseNotImplementedError()def_create_filter_clause(self,filters:Any)->tuple[str,dict]:"""Create LangChain filter representation to matching SQL where clauses Args: filters: Dictionary of filters to apply to the query. Returns: String containing the sql where query. """ifnotisinstance(filters,dict):raiseValueError(f"Invalid type: Expected a dictionary but got type: {type(filters)}")iflen(filters)==1:# The only operators allowed at the top level are $AND, $OR, and $NOT# First check if an operator or a fieldkey,value=list(filters.items())[0]ifkey.startswith("$"):# Then it's an operatorifkey.lower()notin["$and","$or","$not"]:raiseValueError(f"Invalid filter condition. Expected $and, $or or $not "f"but got: {key}")else:# Then it's a fieldreturnself._handle_field_filter(field=key,value=filters[key])ifkey.lower()=="$and"orkey.lower()=="$or":ifnotisinstance(value,list):raiseValueError(f"Expected a list, but got {type(value)} for value: {value}")op=key[1:].upper()# Extract the operatorfilter_clause=[self._create_filter_clause(el)forelinvalue]iflen(filter_clause)>1:all_clauses=[clause[0]forclauseinfilter_clause]params={}forclauseinfilter_clause:params.update(clause[1])returnf"({f' {op} '.join(all_clauses)})",paramseliflen(filter_clause)==1:returnfilter_clause[0]else:raiseValueError("Invalid filter condition. Expected a dictionary ""but got an empty dictionary")elifkey.lower()=="$not":ifisinstance(value,list):not_conditions=[self._create_filter_clause(item)foriteminvalue]all_clauses=[clause[0]forclauseinnot_conditions]params={}forclauseinnot_conditions:params.update(clause[1])not_stmts=[f"NOT {condition}"forconditioninall_clauses]returnf"({' AND '.join(not_stmts)})",paramselifisinstance(value,dict):not_,params=self._create_filter_clause(value)returnf"(NOT {not_})",paramselse:raiseValueError(f"Invalid filter condition. Expected a dictionary "f"or a list but got: {type(value)}")else:raiseValueError(f"Invalid filter condition. Expected $and, $or or $not "f"but got: {key}")eliflen(filters)>1:# Then all keys have to be fields (they cannot be operators)forkeyinfilters.keys():ifkey.startswith("$"):raiseValueError(f"Invalid filter condition. Expected a field but got: {key}")# These should all be fields and combined using an $and operatorand_=[self._handle_field_filter(field=k,value=v)fork,vinfilters.items()]iflen(and_)>1:all_clauses=[clause[0]forclauseinand_]params={}forclauseinand_:params.update(clause[1])returnf"({' AND '.join(all_clauses)})",paramseliflen(and_)==1:returnand_[0]else:raiseValueError("Invalid filter condition. Expected a dictionary ""but got an empty dictionary")else:return"",{}
[docs]defget_by_ids(self,ids:Sequence[str])->list[Document]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[list[dict]]=None,ids:Optional[list]=None,**kwargs:Any,)->list[str]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]defadd_documents(self,documents:list[Document],ids:Optional[list]=None,**kwargs:Any,)->list[str]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]defdelete(self,ids:Optional[list]=None,**kwargs:Any,)->Optional[bool]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]@classmethoddeffrom_texts(# type: ignore[override]cls:type[AsyncPGVectorStore],texts:list[str],embedding:Embeddings,engine:PGEngine,table_name:str,metadatas:Optional[list[dict]]=None,ids:Optional[list]=None,content_column:str="content",embedding_column:str="embedding",metadata_columns:Optional[list[str]]=None,ignore_metadata_columns:Optional[list[str]]=None,id_column:str="langchain_id",metadata_json_column:str="langchain_metadata",**kwargs:Any,)->AsyncPGVectorStore:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]@classmethoddeffrom_documents(# type: ignore[override]cls:type[AsyncPGVectorStore],documents:list[Document],embedding:Embeddings,engine:PGEngine,table_name:str,ids:Optional[list]=None,content_column:str="content",embedding_column:str="embedding",metadata_columns:Optional[list[str]]=None,ignore_metadata_columns:Optional[list[str]]=None,id_column:str="langchain_id",metadata_json_column:str="langchain_metadata",**kwargs:Any,)->AsyncPGVectorStore:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]defsimilarity_search(self,query:str,k:Optional[int]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[Document]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]defsimilarity_search_with_score(self,query:str,k:Optional[int]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[tuple[Document,float]]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]defsimilarity_search_by_vector(self,embedding:list[float],k:Optional[int]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[Document]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]defsimilarity_search_with_score_by_vector(self,embedding:list[float],k:Optional[int]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[tuple[Document,float]]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]defmax_marginal_relevance_search(self,query:str,k:Optional[int]=None,fetch_k:Optional[int]=None,lambda_mult:Optional[float]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[Document]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]defmax_marginal_relevance_search_by_vector(self,embedding:list[float],k:Optional[int]=None,fetch_k:Optional[int]=None,lambda_mult:Optional[float]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[Document]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")
[docs]defmax_marginal_relevance_search_with_score_by_vector(self,embedding:list[float],k:Optional[int]=None,fetch_k:Optional[int]=None,lambda_mult:Optional[float]=None,filter:Optional[dict]=None,**kwargs:Any,)->list[tuple[Document,float]]:raiseNotImplementedError("Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead.")