Source code for langchain_community.vectorstores.analyticdb
from__future__importannotationsimportloggingimportuuidfromtypingimportAny,Callable,Dict,Iterable,List,Optional,Sequence,Tuple,TypefromsqlalchemyimportREAL,Column,String,Table,create_engine,insert,textfromsqlalchemy.dialects.postgresqlimportARRAY,JSON,TEXTtry:fromsqlalchemy.ormimportdeclarative_baseexceptImportError:fromsqlalchemy.ext.declarativeimportdeclarative_basefromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.utilsimportget_from_dict_or_envfromlangchain_core.vectorstoresimportVectorStore_LANGCHAIN_DEFAULT_EMBEDDING_DIM=1536_LANGCHAIN_DEFAULT_COLLECTION_NAME="langchain_document"Base=declarative_base()# type: Any
[docs]classAnalyticDB(VectorStore):"""`AnalyticDB` (distributed PostgreSQL) vector store. AnalyticDB is a distributed full postgresql syntax cloud-native database. - `connection_string` is a postgres connection string. - `embedding_function` any embedding function implementing `langchain.embeddings.base.Embeddings` interface. - `collection_name` is the name of the collection to use. (default: langchain) - NOTE: This is not the name of the table, but the name of the collection. The tables will be created when initializing the store (if not exists) So, make sure the user has the right permissions to create tables. - `pre_delete_collection` if True, will delete the collection if it exists. (default: False) - Useful for testing. """
def__post_init__(self,engine_args:Optional[dict]=None,)->None:""" Initialize the store. """_engine_args=engine_argsor{}if("pool_recycle"notin_engine_args):# Check if pool_recycle is not in _engine_args_engine_args["pool_recycle"]=(3600# Set pool_recycle to 3600s if not present)self.engine=create_engine(self.connection_string,**_engine_args)self.create_collection()@propertydefembeddings(self)->Embeddings:returnself.embedding_functiondef_select_relevance_score_fn(self)->Callable[[float],float]:returnself._euclidean_relevance_score_fn
[docs]defcreate_table_if_not_exists(self)->None:# Define the dynamic tableTable(self.collection_name,Base.metadata,Column("id",TEXT,primary_key=True,default=uuid.uuid4),Column("embedding",ARRAY(REAL)),Column("document",String,nullable=True),Column("metadata",JSON,nullable=True),extend_existing=True,)withself.engine.connect()asconn:withconn.begin():# Create the tableBase.metadata.create_all(conn)# Check if the index existsindex_name=f"{self.collection_name}_embedding_idx"index_query=text(f""" SELECT 1 FROM pg_indexes WHERE indexname = '{index_name}'; """)result=conn.execute(index_query).scalar()# Create the index if it doesn't existifnotresult:index_statement=text(f""" CREATE INDEX {index_name} ON {self.collection_name} USING ann(embedding) WITH ( "dim" = {self.embedding_dimension}, "hnsw_m" = 100 ); """)conn.execute(index_statement)
[docs]defdelete_collection(self)->None:self.logger.debug("Trying to delete collection")drop_statement=text(f"DROP TABLE IF EXISTS {self.collection_name};")withself.engine.connect()asconn:withconn.begin():conn.execute(drop_statement)
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,batch_size:int=500,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. kwargs: vectorstore specific parameters Returns: List of ids from adding the texts into the vectorstore. """ifidsisNone:ids=[str(uuid.uuid4())for_intexts]embeddings=self.embedding_function.embed_documents(list(texts))ifnotmetadatas:metadatas=[{}for_intexts]# Define the table schemachunks_table=Table(self.collection_name,Base.metadata,Column("id",TEXT,primary_key=True),Column("embedding",ARRAY(REAL)),Column("document",String,nullable=True),Column("metadata",JSON,nullable=True),extend_existing=True,)chunks_table_data=[]withself.engine.connect()asconn:withconn.begin():fordocument,metadata,chunk_id,embeddinginzip(texts,metadatas,ids,embeddings):chunks_table_data.append({"id":chunk_id,"embedding":embedding,"document":document,"metadata":metadata,})# Execute the batch insert when the batch size is reachediflen(chunks_table_data)==batch_size:conn.execute(insert(chunks_table).values(chunks_table_data))# Clear the chunks_table_data list for the next batchchunks_table_data.clear()# Insert any remaining records that didn't make up a full batchifchunks_table_data:conn.execute(insert(chunks_table).values(chunks_table_data))returnids
[docs]defsimilarity_search(self,query:str,k:int=4,filter:Optional[dict]=None,**kwargs:Any,)->List[Document]:"""Run similarity search with AnalyticDB with distance. Args: query (str): Query text to search for. k (int): Number of results to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query. """embedding=self.embedding_function.embed_query(text=query)returnself.similarity_search_by_vector(embedding=embedding,k=k,filter=filter,)
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,filter:Optional[dict]=None,)->List[Tuple[Document,float]]:"""Return docs most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query and score for each """embedding=self.embedding_function.embed_query(query)docs=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter)returndocs
[docs]defsimilarity_search_with_score_by_vector(self,embedding:List[float],k:int=4,filter:Optional[dict]=None,)->List[Tuple[Document,float]]:# Add the filter if providedtry:fromsqlalchemy.engineimportRowexceptImportError:raiseImportError("Could not import Row from sqlalchemy.engine. ""Please 'pip install sqlalchemy>=1.4'.")filter_condition=""iffilterisnotNone:conditions=[f"metadata->>{key!r} = {value!r}"forkey,valueinfilter.items()]filter_condition=f"WHERE {' AND '.join(conditions)}"# Define the base querysql_query=f""" SELECT *, l2_distance(embedding, :embedding) as distance FROM {self.collection_name}{filter_condition} ORDER BY embedding <-> :embedding LIMIT :k """# Set up the query parametersparams={"embedding":embedding,"k":k}# Execute the query and fetch the resultswithself.engine.connect()asconn:results:Sequence[Row]=conn.execute(text(sql_query),params).fetchall()documents_with_scores=[(Document(page_content=result.document,metadata=result.metadata,),result.distanceifself.embedding_functionisnotNoneelseNone,)forresultinresults]returndocuments_with_scores
[docs]defsimilarity_search_by_vector(self,embedding:List[float],k:int=4,filter:Optional[dict]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query vector. """docs_and_scores=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter)return[docfordoc,_indocs_and_scores]
[docs]defdelete(self,ids:Optional[List[str]]=None,**kwargs:Any)->Optional[bool]:"""Delete by vector IDs. Args: ids: List of ids to delete. """ifidsisNone:raiseValueError("No ids provided to delete.")# Define the table schemachunks_table=Table(self.collection_name,Base.metadata,Column("id",TEXT,primary_key=True),Column("embedding",ARRAY(REAL)),Column("document",String,nullable=True),Column("metadata",JSON,nullable=True),extend_existing=True,)try:withself.engine.connect()asconn:withconn.begin():delete_condition=chunks_table.c.id.in_(ids)conn.execute(chunks_table.delete().where(delete_condition))returnTrueexceptExceptionase:print("Delete operation failed:",str(e))# noqa: T201returnFalse
[docs]@classmethoddeffrom_texts(cls:Type[AnalyticDB],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,embedding_dimension:int=_LANGCHAIN_DEFAULT_EMBEDDING_DIM,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,engine_args:Optional[dict]=None,**kwargs:Any,)->AnalyticDB:""" Return VectorStore initialized from texts and embeddings. Postgres Connection string is required Either pass it as a parameter or set the PG_CONNECTION_STRING environment variable. """connection_string=cls.get_connection_string(kwargs)store=cls(connection_string=connection_string,collection_name=collection_name,embedding_function=embedding,embedding_dimension=embedding_dimension,pre_delete_collection=pre_delete_collection,engine_args=engine_args,)store.add_texts(texts=texts,metadatas=metadatas,ids=ids,**kwargs)returnstore
[docs]@classmethoddefget_connection_string(cls,kwargs:Dict[str,Any])->str:connection_string:str=get_from_dict_or_env(data=kwargs,key="connection_string",env_key="PG_CONNECTION_STRING",)ifnotconnection_string:raiseValueError("Postgres connection string is required""Either pass it as a parameter""or set the PG_CONNECTION_STRING environment variable.")returnconnection_string
[docs]@classmethoddeffrom_documents(cls:Type[AnalyticDB],documents:List[Document],embedding:Embeddings,embedding_dimension:int=_LANGCHAIN_DEFAULT_EMBEDDING_DIM,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,engine_args:Optional[dict]=None,**kwargs:Any,)->AnalyticDB:""" Return VectorStore initialized from documents and embeddings. Postgres Connection string is required Either pass it as a parameter or set the PG_CONNECTION_STRING environment variable. """texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]connection_string=cls.get_connection_string(kwargs)kwargs["connection_string"]=connection_stringreturncls.from_texts(texts=texts,pre_delete_collection=pre_delete_collection,embedding=embedding,embedding_dimension=embedding_dimension,metadatas=metadatas,ids=ids,collection_name=collection_name,engine_args=engine_args,**kwargs,)
[docs]@classmethoddefconnection_string_from_db_params(cls,driver:str,host:str,port:int,database:str,user:str,password:str,)->str:"""Return connection string from database parameters."""returnf"postgresql+{driver}://{user}:{password}@{host}:{port}/{database}"