[docs]classDistanceStrategy(str,enum.Enum):"""Enumerator of the Distance strategies."""EUCLIDEAN="l2"COSINE="cosine"MAX_INNER_PRODUCT="inner"
DEFAULT_DISTANCE_STRATEGY=DistanceStrategy.COSINEBase=declarative_base()# type: Any_LANGCHAIN_DEFAULT_COLLECTION_NAME="langchain"_classes:Any=NoneCOMPARISONS_TO_NATIVE={"$eq":"==","$ne":"!=","$lt":"<","$lte":"<=","$gt":">","$gte":">=",}SPECIAL_CASED_OPERATORS={"$in","$nin","$between","$exists",}TEXT_OPERATORS={"$like","$ilike",}LOGICAL_OPERATORS={"$and","$or","$not"}SUPPORTED_OPERATORS=(set(COMPARISONS_TO_NATIVE).union(TEXT_OPERATORS).union(LOGICAL_OPERATORS).union(SPECIAL_CASED_OPERATORS))def_get_embedding_collection_store(vector_dimension:Optional[int]=None)->Any:global_classesif_classesisnotNone:return_classesfrompgvector.sqlalchemyimportVector# type: ignoreclassCollectionStore(Base):"""Collection store."""__tablename__="langchain_pg_collection"uuid=sqlalchemy.Column(UUID(as_uuid=True),primary_key=True,default=uuid.uuid4)name=sqlalchemy.Column(sqlalchemy.String,nullable=False,unique=True)cmetadata=sqlalchemy.Column(JSON)embeddings=relationship("EmbeddingStore",back_populates="collection",passive_deletes=True,)@classmethoddefget_by_name(cls,session:Session,name:str)->Optional["CollectionStore"]:return(session.query(cls).filter(typing_cast(sqlalchemy.Column,cls.name)==name).first())@classmethodasyncdefaget_by_name(cls,session:AsyncSession,name:str)->Optional["CollectionStore"]:return((awaitsession.execute(select(CollectionStore).where(typing_cast(sqlalchemy.Column,cls.name)==name))).scalars().first())@classmethoddefget_or_create(cls,session:Session,name:str,cmetadata:Optional[dict]=None,)->Tuple["CollectionStore",bool]:"""Get or create a collection. Returns: Where the bool is True if the collection was created. """# noqa: E501created=Falsecollection=cls.get_by_name(session,name)ifcollection:returncollection,createdcollection=cls(name=name,cmetadata=cmetadata)session.add(collection)session.commit()created=Truereturncollection,created@classmethodasyncdefaget_or_create(cls,session:AsyncSession,name:str,cmetadata:Optional[dict]=None,)->Tuple["CollectionStore",bool]:""" Get or create a collection. Returns [Collection, bool] where the bool is True if the collection was created. """# noqa: E501created=Falsecollection=awaitcls.aget_by_name(session,name)ifcollection:returncollection,createdcollection=cls(name=name,cmetadata=cmetadata)session.add(collection)awaitsession.commit()created=Truereturncollection,createdclassEmbeddingStore(Base):"""Embedding store."""__tablename__="langchain_pg_embedding"id=sqlalchemy.Column(sqlalchemy.String,nullable=True,primary_key=True,index=True,unique=True)collection_id=sqlalchemy.Column(UUID(as_uuid=True),sqlalchemy.ForeignKey(f"{CollectionStore.__tablename__}.uuid",ondelete="CASCADE",),)collection=relationship(CollectionStore,back_populates="embeddings")embedding:Vector=sqlalchemy.Column(Vector(vector_dimension))document=sqlalchemy.Column(sqlalchemy.String,nullable=True)cmetadata=sqlalchemy.Column(JSONB,nullable=True)__table_args__=(sqlalchemy.Index("ix_cmetadata_gin","cmetadata",postgresql_using="gin",postgresql_ops={"cmetadata":"jsonb_path_ops"},),)_classes=(EmbeddingStore,CollectionStore)return_classesdef_results_to_docs(docs_and_scores:Any)->List[Document]:"""Return docs from docs and scores."""return[docfordoc,_indocs_and_scores]def_create_vector_extension(conn:Connection)->None:statement=sqlalchemy.text("SELECT pg_advisory_xact_lock(1573678846307946496);""CREATE EXTENSION IF NOT EXISTS vector;")conn.execute(statement)conn.commit()DBConnection=Union[sqlalchemy.engine.Engine,str]
[docs]classPGVector(VectorStore):"""Postgres vector store integration. Setup: Install ``langchain_postgres`` and run the docker container. .. code-block:: bash pip install -qU langchain-postgres docker run --name pgvector-container -e POSTGRES_USER=langchain -e POSTGRES_PASSWORD=langchain -e POSTGRES_DB=langchain -p 6024:5432 -d pgvector/pgvector:pg16 Key init args — indexing params: collection_name: str Name of the collection. embeddings: Embeddings Embedding function to use. Key init args — client params: connection: Union[None, DBConnection, Engine, AsyncEngine, str] Connection string or engine. Instantiate: .. code-block:: python from langchain_postgres.vectorstores import PGVector from langchain_openai import OpenAIEmbeddings # See docker command above to launch a postgres instance with pgvector enabled. connection = "postgresql+psycopg://langchain:langchain@localhost:6024/langchain" # Uses psycopg3! collection_name = "my_docs" vector_store = PGVector( embeddings=OpenAIEmbeddings(model="text-embedding-3-large"), collection_name=collection_name, connection=connection, use_jsonb=True, ) Add Documents: .. code-block:: python from langchain_core.documents import Document document_1 = Document(page_content="foo", metadata={"baz": "bar"}) document_2 = Document(page_content="thud", metadata={"bar": "baz"}) document_3 = Document(page_content="i will be deleted :(") documents = [document_1, document_2, document_3] ids = ["1", "2", "3"] vector_store.add_documents(documents=documents, ids=ids) Delete Documents: .. code-block:: python vector_store.delete(ids=["3"]) Search: .. code-block:: python results = vector_store.similarity_search(query="thud",k=1) for doc in results: print(f"* {doc.page_content} [{doc.metadata}]") .. code-block:: python * thud [{'bar': 'baz'}] Search with filter: .. code-block:: python results = vector_store.similarity_search(query="thud",k=1,filter={"bar": "baz"}) for doc in results: print(f"* {doc.page_content} [{doc.metadata}]") .. code-block:: python * thud [{'bar': 'baz'}] Search with score: .. code-block:: python results = vector_store.similarity_search_with_score(query="qux",k=1) for doc, score in results: print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") .. code-block:: python * [SIM=0.499243] foo [{'baz': 'bar'}] Async: .. code-block:: python # add documents # await vector_store.aadd_documents(documents=documents, ids=ids) # delete documents # await vector_store.adelete(ids=["3"]) # search # results = vector_store.asimilarity_search(query="thud",k=1) # search with score results = await vector_store.asimilarity_search_with_score(query="qux",k=1) for doc,score in results: print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") .. code-block:: python * [SIM=0.499243] foo [{'baz': 'bar'}] Use as Retriever: .. code-block:: python retriever = vector_store.as_retriever( search_type="mmr", search_kwargs={"k": 1, "fetch_k": 2, "lambda_mult": 0.5}, ) retriever.invoke("thud") .. code-block:: python [Document(metadata={'bar': 'baz'}, page_content='thud')] """# noqa: E501
[docs]def__init__(self,embeddings:Embeddings,*,connection:Union[None,DBConnection,Engine,AsyncEngine,str]=None,embedding_length:Optional[int]=None,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,collection_metadata:Optional[dict]=None,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,pre_delete_collection:bool=False,logger:Optional[logging.Logger]=None,relevance_score_fn:Optional[Callable[[float],float]]=None,engine_args:Optional[dict[str,Any]]=None,use_jsonb:bool=True,create_extension:bool=True,async_mode:bool=False,)->None:"""Initialize the PGVector store. For an async version, use `PGVector.acreate()` instead. Args: connection: Postgres connection string or (async)engine. embeddings: Any embedding function implementing `langchain.embeddings.base.Embeddings` interface. embedding_length: The length of the embedding vector. (default: None) NOTE: This is not mandatory. Defining it will prevent vectors of any other size to be added to the embeddings table but, without it, the embeddings can't be indexed. collection_name: The name of the collection to use. (default: langchain) NOTE: This is not the name of the table, but the name of the collection. The tables will be created when initializing the store (if not exists) So, make sure the user has the right permissions to create tables. distance_strategy: The distance strategy to use. (default: COSINE) pre_delete_collection: If True, will delete the collection if it exists. (default: False). Useful for testing. engine_args: SQLAlchemy's create engine arguments. use_jsonb: Use JSONB instead of JSON for metadata. (default: True) Strongly discouraged from using JSON as it's not as efficient for querying. It's provided here for backwards compatibility with older versions, and will be removed in the future. create_extension: If True, will create the vector extension if it doesn't exist. disabling creation is useful when using ReadOnly Databases. """self.async_mode=async_modeself.embedding_function=embeddingsself._embedding_length=embedding_lengthself.collection_name=collection_nameself.collection_metadata=collection_metadataself._distance_strategy=distance_strategyself.pre_delete_collection=pre_delete_collectionself.logger=loggerorlogging.getLogger(__name__)self.override_relevance_score_fn=relevance_score_fnself._engine:Optional[Engine]=Noneself._async_engine:Optional[AsyncEngine]=Noneself._async_init=Falseifisinstance(connection,str):ifasync_mode:self._async_engine=create_async_engine(connection,**(engine_argsor{}))else:self._engine=create_engine(url=connection,**(engine_argsor{}))elifisinstance(connection,Engine):self.async_mode=Falseself._engine=connectionelifisinstance(connection,AsyncEngine):self.async_mode=Trueself._async_engine=connectionelse:raiseValueError("connection should be a connection string or an instance of ""sqlalchemy.engine.Engine or sqlalchemy.ext.asyncio.engine.AsyncEngine")self.session_maker:Union[scoped_session,async_sessionmaker]ifself.async_mode:self.session_maker=async_sessionmaker(bind=self._async_engine)else:self.session_maker=scoped_session(sessionmaker(bind=self._engine))self.use_jsonb=use_jsonbself.create_extension=create_extensionifnotuse_jsonb:# Replace with a deprecation warning.raiseNotImplementedError("use_jsonb=False is no longer supported.")ifnotself.async_mode:self.__post_init__()
def__post_init__(self,)->None:"""Initialize the store."""ifself.create_extension:self.create_vector_extension()EmbeddingStore,CollectionStore=_get_embedding_collection_store(self._embedding_length)self.CollectionStore=CollectionStoreself.EmbeddingStore=EmbeddingStoreself.create_tables_if_not_exists()self.create_collection()asyncdef__apost_init__(self,)->None:"""Async initialize the store (use lazy approach)."""ifself._async_init:# Warning: possible race conditionreturnself._async_init=TrueEmbeddingStore,CollectionStore=_get_embedding_collection_store(self._embedding_length)self.CollectionStore=CollectionStoreself.EmbeddingStore=EmbeddingStoreifself.create_extension:awaitself.acreate_vector_extension()awaitself.acreate_tables_if_not_exists()awaitself.acreate_collection()@propertydefembeddings(self)->Embeddings:returnself.embedding_function
[docs]defcreate_vector_extension(self)->None:assertself._engine,"engine not found"try:withself._engine.connect()asconn:_create_vector_extension(conn)exceptExceptionase:raiseException(f"Failed to create vector extension: {e}")frome
[docs]asyncdefacreate_vector_extension(self)->None:assertself._async_engine,"_async_engine not found"asyncwithself._async_engine.begin()asconn:awaitconn.run_sync(_create_vector_extension)
[docs]asyncdefacreate_tables_if_not_exists(self)->None:assertself._async_engine,"This method must be called with async_mode"asyncwithself._async_engine.begin()asconn:awaitconn.run_sync(Base.metadata.create_all)
[docs]asyncdefadrop_tables(self)->None:assertself._async_engine,"This method must be called with async_mode"awaitself.__apost_init__()# Lazy async initasyncwithself._async_engine.begin()asconn:awaitconn.run_sync(Base.metadata.drop_all)
def_delete_collection(self,session:Session)->None:collection=self.get_collection(session)ifnotcollection:self.logger.warning("Collection not found")returnsession.delete(collection)asyncdef_adelete_collection(self,session:AsyncSession)->None:collection=awaitself.aget_collection(session)ifnotcollection:self.logger.warning("Collection not found")returnawaitsession.delete(collection)
[docs]defdelete_collection(self)->None:withself._make_sync_session()assession:collection=self.get_collection(session)ifnotcollection:self.logger.warning("Collection not found")returnsession.delete(collection)session.commit()
[docs]asyncdefadelete_collection(self)->None:awaitself.__apost_init__()# Lazy async initasyncwithself._make_async_session()assession:collection=awaitself.aget_collection(session)ifnotcollection:self.logger.warning("Collection not found")returnawaitsession.delete(collection)awaitsession.commit()
[docs]defdelete(self,ids:Optional[List[str]]=None,collection_only:bool=False,**kwargs:Any,)->None:"""Delete vectors by ids or uuids. Args: ids: List of ids to delete. collection_only: Only delete ids in the collection. """withself._make_sync_session()assession:ifidsisnotNone:self.logger.debug("Trying to delete vectors by ids (represented by the model ""using the custom ids field)")stmt=delete(self.EmbeddingStore)ifcollection_only:collection=self.get_collection(session)ifnotcollection:self.logger.warning("Collection not found")returnstmt=stmt.where(self.EmbeddingStore.collection_id==collection.uuid)stmt=stmt.where(self.EmbeddingStore.id.in_(ids))session.execute(stmt)session.commit()
[docs]asyncdefadelete(self,ids:Optional[List[str]]=None,collection_only:bool=False,**kwargs:Any,)->None:"""Async delete vectors by ids or uuids. Args: ids: List of ids to delete. collection_only: Only delete ids in the collection. """awaitself.__apost_init__()# Lazy async initasyncwithself._make_async_session()assession:ifidsisnotNone:self.logger.debug("Trying to delete vectors by ids (represented by the model ""using the custom ids field)")stmt=delete(self.EmbeddingStore)ifcollection_only:collection=awaitself.aget_collection(session)ifnotcollection:self.logger.warning("Collection not found")returnstmt=stmt.where(self.EmbeddingStore.collection_id==collection.uuid)stmt=stmt.where(self.EmbeddingStore.id.in_(ids))awaitsession.execute(stmt)awaitsession.commit()
[docs]defget_collection(self,session:Session)->Any:assertnotself._async_engine,"This method must be called without async_mode"returnself.CollectionStore.get_by_name(session,self.collection_name)
[docs]asyncdefaget_collection(self,session:AsyncSession)->Any:assertself._async_engine,"This method must be called with async_mode"awaitself.__apost_init__()# Lazy async initreturnawaitself.CollectionStore.aget_by_name(session,self.collection_name)
[docs]defadd_embeddings(self,texts:Sequence[str],embeddings:List[List[float]],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Add embeddings to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. embeddings: List of list of embedding vectors. metadatas: List of metadatas associated with the texts. ids: Optional list of ids for the documents. If not provided, will generate a new id for each document. kwargs: vectorstore specific parameters """assertnotself._async_engine,"This method must be called with sync_mode"ifidsisNone:ids_=[str(uuid.uuid4())for_intexts]else:ids_=[idifidisnotNoneelsestr(uuid.uuid4())foridinids]ifnotmetadatas:metadatas=[{}for_intexts]withself._make_sync_session()assession:# type: ignore[arg-type]collection=self.get_collection(session)ifnotcollection:raiseValueError("Collection not found")data=[{"id":id,"collection_id":collection.uuid,"embedding":embedding,"document":text,"cmetadata":metadataor{},}fortext,metadata,embedding,idinzip(texts,metadatas,embeddings,ids_)]stmt=insert(self.EmbeddingStore).values(data)on_conflict_stmt=stmt.on_conflict_do_update(index_elements=["id"],# Conflict detection based on these columnsset_={"embedding":stmt.excluded.embedding,"document":stmt.excluded.document,"cmetadata":stmt.excluded.cmetadata,},)session.execute(on_conflict_stmt)session.commit()returnids_
[docs]asyncdefaadd_embeddings(self,texts:Sequence[str],embeddings:List[List[float]],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Async add embeddings to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. embeddings: List of list of embedding vectors. metadatas: List of metadatas associated with the texts. ids: Optional list of ids for the texts. If not provided, will generate a new id for each text. kwargs: vectorstore specific parameters """awaitself.__apost_init__()# Lazy async initifidsisNone:ids_=[str(uuid.uuid4())for_intexts]else:ids_=[idifidisnotNoneelsestr(uuid.uuid4())foridinids]ifnotmetadatas:metadatas=[{}for_intexts]asyncwithself._make_async_session()assession:# type: ignore[arg-type]collection=awaitself.aget_collection(session)ifnotcollection:raiseValueError("Collection not found")data=[{"id":id,"collection_id":collection.uuid,"embedding":embedding,"document":text,"cmetadata":metadataor{},}fortext,metadata,embedding,idinzip(texts,metadatas,embeddings,ids_)]stmt=insert(self.EmbeddingStore).values(data)on_conflict_stmt=stmt.on_conflict_do_update(index_elements=["id"],# Conflict detection based on these columnsset_={"embedding":stmt.excluded.embedding,"document":stmt.excluded.document,"cmetadata":stmt.excluded.cmetadata,},)awaitsession.execute(on_conflict_stmt)awaitsession.commit()returnids_
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of ids for the texts. If not provided, will generate a new id for each text. kwargs: vectorstore specific parameters Returns: List of ids from adding the texts into the vectorstore. """assertnotself._async_engine,"This method must be called without async_mode"texts_=list(texts)embeddings=self.embedding_function.embed_documents(texts_)returnself.add_embeddings(texts=texts_,embeddings=list(embeddings),metadatas=list(metadatas)ifmetadataselseNone,ids=list(ids)ifidselseNone,**kwargs,)
[docs]asyncdefaadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of ids for the texts. If not provided, will generate a new id for each text. kwargs: vectorstore specific parameters Returns: List of ids from adding the texts into the vectorstore. """awaitself.__apost_init__()# Lazy async inittexts_=list(texts)embeddings=awaitself.embedding_function.aembed_documents(texts_)returnawaitself.aadd_embeddings(texts=texts_,embeddings=list(embeddings),metadatas=list(metadatas)ifmetadataselseNone,ids=list(ids)ifidselseNone,**kwargs,)
[docs]defsimilarity_search(self,query:str,k:int=4,filter:Optional[dict]=None,**kwargs:Any,)->List[Document]:"""Run similarity search with PGVector with distance. Args: query (str): Query text to search for. k (int): Number of results to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query. """assertnotself._async_engine,"This method must be called without async_mode"embedding=self.embeddings.embed_query(query)returnself.similarity_search_by_vector(embedding=embedding,k=k,filter=filter,)
[docs]asyncdefasimilarity_search(self,query:str,k:int=4,filter:Optional[dict]=None,**kwargs:Any,)->List[Document]:"""Run similarity search with PGVector with distance. Args: query (str): Query text to search for. k (int): Number of results to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query. """awaitself.__apost_init__()# Lazy async initembedding=awaitself.embeddings.aembed_query(query)returnawaitself.asimilarity_search_by_vector(embedding=embedding,k=k,filter=filter,)
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,filter:Optional[dict]=None,)->List[Tuple[Document,float]]:"""Return docs most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query and score for each. """assertnotself._async_engine,"This method must be called without async_mode"embedding=self.embeddings.embed_query(query)docs=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter)returndocs
[docs]asyncdefasimilarity_search_with_score(self,query:str,k:int=4,filter:Optional[dict]=None,)->List[Tuple[Document,float]]:"""Return docs most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query and score for each. """awaitself.__apost_init__()# Lazy async initembedding=awaitself.embeddings.aembed_query(query)docs=awaitself.asimilarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter)returndocs
@propertydefdistance_strategy(self)->Any:ifself._distance_strategy==DistanceStrategy.EUCLIDEAN:returnself.EmbeddingStore.embedding.l2_distanceelifself._distance_strategy==DistanceStrategy.COSINE:returnself.EmbeddingStore.embedding.cosine_distanceelifself._distance_strategy==DistanceStrategy.MAX_INNER_PRODUCT:returnself.EmbeddingStore.embedding.max_inner_productelse:raiseValueError(f"Got unexpected value for distance: {self._distance_strategy}. "f"Should be one of {', '.join([ds.valuefordsinDistanceStrategy])}.")
[docs]defsimilarity_search_with_score_by_vector(self,embedding:List[float],k:int=4,filter:Optional[dict]=None,)->List[Tuple[Document,float]]:assertnotself._async_engine,"This method must be called without async_mode"results=self.__query_collection(embedding=embedding,k=k,filter=filter)returnself._results_to_docs_and_scores(results)
def_results_to_docs_and_scores(self,results:Any)->List[Tuple[Document,float]]:"""Return docs and scores from results."""docs=[(Document(id=str(result.EmbeddingStore.id),page_content=result.EmbeddingStore.document,metadata=result.EmbeddingStore.cmetadata,),result.distanceifself.embeddingsisnotNoneelseNone,)forresultinresults]returndocsdef_handle_field_filter(self,field:str,value:Any,)->SQLColumnExpression:"""Create a filter for a specific field. Args: field: name of field value: value to filter If provided as is then this will be an equality filter If provided as a dictionary then this will be a filter, the key will be the operator and the value will be the value to filter by Returns: sqlalchemy expression """ifnotisinstance(field,str):raiseValueError(f"field should be a string but got: {type(field)} with value: {field}")iffield.startswith("$"):raiseValueError(f"Invalid filter condition. Expected a field but got an operator: "f"{field}")# Allow [a-zA-Z0-9_], disallow $ for now until we support escape charactersifnotfield.isidentifier():raiseValueError(f"Invalid field name: {field}. Expected a valid identifier.")ifisinstance(value,dict):# This is a filter specificationiflen(value)!=1:raiseValueError("Invalid filter condition. Expected a value which ""is a dictionary with a single key that corresponds to an operator "f"but got a dictionary with {len(value)} keys. The first few "f"keys are: {list(value.keys())[:3]}")operator,filter_value=list(value.items())[0]# Verify that that operator is an operatorifoperatornotinSUPPORTED_OPERATORS:raiseValueError(f"Invalid operator: {operator}. "f"Expected one of {SUPPORTED_OPERATORS}")else:# Then we assume an equality operatoroperator="$eq"filter_value=valueifoperatorinCOMPARISONS_TO_NATIVE:# Then we implement an equality filter# native is trusted inputnative=COMPARISONS_TO_NATIVE[operator]returnfunc.jsonb_path_match(self.EmbeddingStore.cmetadata,cast(f"$.{field}{native} $value",JSONPATH),cast({"value":filter_value},JSONB),)elifoperator=="$between":# Use AND with two comparisonslow,high=filter_valuelower_bound=func.jsonb_path_match(self.EmbeddingStore.cmetadata,cast(f"$.{field} >= $value",JSONPATH),cast({"value":low},JSONB),)upper_bound=func.jsonb_path_match(self.EmbeddingStore.cmetadata,cast(f"$.{field} <= $value",JSONPATH),cast({"value":high},JSONB),)returnsqlalchemy.and_(lower_bound,upper_bound)elifoperatorin{"$in","$nin","$like","$ilike"}:# We'll do force coercion to textifoperatorin{"$in","$nin"}:forvalinfilter_value:ifnotisinstance(val,(str,int,float)):raiseNotImplementedError(f"Unsupported type: {type(val)} for value: {val}")ifisinstance(val,bool):# b/c bool is an instance of intraiseNotImplementedError(f"Unsupported type: {type(val)} for value: {val}")queried_field=self.EmbeddingStore.cmetadata[field].astextifoperatorin{"$in"}:returnqueried_field.in_([str(val)forvalinfilter_value])elifoperatorin{"$nin"}:return~queried_field.in_([str(val)forvalinfilter_value])elifoperatorin{"$like"}:returnqueried_field.like(filter_value)elifoperatorin{"$ilike"}:returnqueried_field.ilike(filter_value)else:raiseNotImplementedError()elifoperator=="$exists":ifnotisinstance(filter_value,bool):raiseValueError("Expected a boolean value for $exists "f"operator, but got: {filter_value}")condition=func.jsonb_exists(self.EmbeddingStore.cmetadata,field,)returnconditioniffilter_valueelse~conditionelse:raiseNotImplementedError()def_create_filter_clause_deprecated(self,key,value):# type: ignore[no-untyped-def]"""Deprecated functionality. This is for backwards compatibility with the JSON based schema for metadata. It uses incorrect operator syntax (operators are not prefixed with $). This implementation is not efficient, and has bugs associated with the way that it handles numeric filter clauses. """IN,NIN,BETWEEN,GT,LT,NE="in","nin","between","gt","lt","ne"EQ,LIKE,CONTAINS,OR,AND="eq","like","contains","or","and"value_case_insensitive={k.lower():vfork,vinvalue.items()}ifINinmap(str.lower,value):filter_by_metadata=self.EmbeddingStore.cmetadata[key].astext.in_(value_case_insensitive[IN])elifNINinmap(str.lower,value):filter_by_metadata=self.EmbeddingStore.cmetadata[key].astext.not_in(value_case_insensitive[NIN])elifBETWEENinmap(str.lower,value):filter_by_metadata=self.EmbeddingStore.cmetadata[key].astext.between(str(value_case_insensitive[BETWEEN][0]),str(value_case_insensitive[BETWEEN][1]),)elifGTinmap(str.lower,value):filter_by_metadata=self.EmbeddingStore.cmetadata[key].astext>str(value_case_insensitive[GT])elifLTinmap(str.lower,value):filter_by_metadata=self.EmbeddingStore.cmetadata[key].astext<str(value_case_insensitive[LT])elifNEinmap(str.lower,value):filter_by_metadata=self.EmbeddingStore.cmetadata[key].astext!=str(value_case_insensitive[NE])elifEQinmap(str.lower,value):filter_by_metadata=self.EmbeddingStore.cmetadata[key].astext==str(value_case_insensitive[EQ])elifLIKEinmap(str.lower,value):filter_by_metadata=self.EmbeddingStore.cmetadata[key].astext.like(value_case_insensitive[LIKE])elifCONTAINSinmap(str.lower,value):filter_by_metadata=self.EmbeddingStore.cmetadata[key].astext.contains(value_case_insensitive[CONTAINS])elifORinmap(str.lower,value):or_clauses=[self._create_filter_clause(key,sub_value)forsub_valueinvalue_case_insensitive[OR]]filter_by_metadata=sqlalchemy.or_(*or_clauses)elifANDinmap(str.lower,value):and_clauses=[self._create_filter_clause(key,sub_value)forsub_valueinvalue_case_insensitive[AND]]filter_by_metadata=sqlalchemy.and_(*and_clauses)else:filter_by_metadata=Nonereturnfilter_by_metadatadef_create_filter_clause_json_deprecated(self,filter:Any)->List[SQLColumnExpression]:"""Convert filters from IR to SQL clauses. **DEPRECATED** This functionality will be deprecated in the future. It implements translation of filters for a schema that uses JSON for metadata rather than the JSONB field which is more efficient for querying. """filter_clauses=[]forkey,valueinfilter.items():ifisinstance(value,dict):filter_by_metadata=self._create_filter_clause_deprecated(key,value)iffilter_by_metadataisnotNone:filter_clauses.append(filter_by_metadata)else:filter_by_metadata=self.EmbeddingStore.cmetadata[key].astext==str(value)filter_clauses.append(filter_by_metadata)returnfilter_clausesdef_create_filter_clause(self,filters:Any)->Any:"""Convert LangChain IR filter representation to matching SQLAlchemy clauses. At the top level, we still don't know if we're working with a field or an operator for the keys. After we've determined that we can call the appropriate logic to handle filter creation. Args: filters: Dictionary of filters to apply to the query. Returns: SQLAlchemy clause to apply to the query. """ifisinstance(filters,dict):iflen(filters)==1:# The only operators allowed at the top level are $AND, $OR, and $NOT# First check if an operator or a fieldkey,value=list(filters.items())[0]ifkey.startswith("$"):# Then it's an operatorifkey.lower()notin["$and","$or","$not"]:raiseValueError(f"Invalid filter condition. Expected $and, $or or $not "f"but got: {key}")else:# Then it's a fieldreturnself._handle_field_filter(key,filters[key])ifkey.lower()=="$and":ifnotisinstance(value,list):raiseValueError(f"Expected a list, but got {type(value)} for value: {value}")and_=[self._create_filter_clause(el)forelinvalue]iflen(and_)>1:returnsqlalchemy.and_(*and_)eliflen(and_)==1:returnand_[0]else:raiseValueError("Invalid filter condition. Expected a dictionary ""but got an empty dictionary")elifkey.lower()=="$or":ifnotisinstance(value,list):raiseValueError(f"Expected a list, but got {type(value)} for value: {value}")or_=[self._create_filter_clause(el)forelinvalue]iflen(or_)>1:returnsqlalchemy.or_(*or_)eliflen(or_)==1:returnor_[0]else:raiseValueError("Invalid filter condition. Expected a dictionary ""but got an empty dictionary")elifkey.lower()=="$not":ifisinstance(value,list):not_conditions=[self._create_filter_clause(item)foriteminvalue]not_=sqlalchemy.and_(*[sqlalchemy.not_(condition)forconditioninnot_conditions])returnnot_elifisinstance(value,dict):not_=self._create_filter_clause(value)returnsqlalchemy.not_(not_)else:raiseValueError(f"Invalid filter condition. Expected a dictionary "f"or a list but got: {type(value)}")else:raiseValueError(f"Invalid filter condition. Expected $and, $or or $not "f"but got: {key}")eliflen(filters)>1:# Then all keys have to be fields (they cannot be operators)forkeyinfilters.keys():ifkey.startswith("$"):raiseValueError(f"Invalid filter condition. Expected a field but got: {key}")# These should all be fields and combined using an $and operatorand_=[self._handle_field_filter(k,v)fork,vinfilters.items()]iflen(and_)>1:returnsqlalchemy.and_(*and_)eliflen(and_)==1:returnand_[0]else:raiseValueError("Invalid filter condition. Expected a dictionary ""but got an empty dictionary")else:raiseValueError("Got an empty dictionary for filters.")else:raiseValueError(f"Invalid type: Expected a dictionary but got type: {type(filters)}")def__query_collection(self,embedding:List[float],k:int=4,filter:Optional[Dict[str,str]]=None,)->Sequence[Any]:"""Query the collection."""withself._make_sync_session()assession:# type: ignore[arg-type]collection=self.get_collection(session)ifnotcollection:raiseValueError("Collection not found")filter_by=[self.EmbeddingStore.collection_id==collection.uuid]iffilter:ifself.use_jsonb:filter_clauses=self._create_filter_clause(filter)iffilter_clausesisnotNone:filter_by.append(filter_clauses)else:# Old way of doing thingsfilter_clauses=self._create_filter_clause_json_deprecated(filter)filter_by.extend(filter_clauses)_type=self.EmbeddingStoreresults:List[Any]=(session.query(self.EmbeddingStore,self.distance_strategy(embedding).label("distance"),).filter(*filter_by).order_by(sqlalchemy.asc("distance")).join(self.CollectionStore,self.EmbeddingStore.collection_id==self.CollectionStore.uuid,).limit(k).all())returnresultsasyncdef__aquery_collection(self,session:AsyncSession,embedding:List[float],k:int=4,filter:Optional[Dict[str,str]]=None,)->Sequence[Any]:"""Query the collection."""asyncwithself._make_async_session()assession:# type: ignore[arg-type]collection=awaitself.aget_collection(session)ifnotcollection:raiseValueError("Collection not found")filter_by=[self.EmbeddingStore.collection_id==collection.uuid]iffilter:ifself.use_jsonb:filter_clauses=self._create_filter_clause(filter)iffilter_clausesisnotNone:filter_by.append(filter_clauses)else:# Old way of doing thingsfilter_clauses=self._create_filter_clause_json_deprecated(filter)filter_by.extend(filter_clauses)_type=self.EmbeddingStorestmt=(select(self.EmbeddingStore,self.distance_strategy(embedding).label("distance"),).filter(*filter_by).order_by(sqlalchemy.asc("distance")).join(self.CollectionStore,self.EmbeddingStore.collection_id==self.CollectionStore.uuid,).limit(k))results:Sequence[Any]=(awaitsession.execute(stmt)).all()returnresults
[docs]defsimilarity_search_by_vector(self,embedding:List[float],k:int=4,filter:Optional[dict]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query vector. """assertnotself._async_engine,"This method must be called without async_mode"docs_and_scores=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter)return_results_to_docs(docs_and_scores)
[docs]asyncdefasimilarity_search_by_vector(self,embedding:List[float],k:int=4,filter:Optional[dict]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query vector. """assertself._async_engine,"This method must be called with async_mode"awaitself.__apost_init__()# Lazy async initdocs_and_scores=awaitself.asimilarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter)return_results_to_docs(docs_and_scores)
[docs]@classmethoddeffrom_texts(cls:Type[PGVector],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,*,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,use_jsonb:bool=True,**kwargs:Any,)->PGVector:"""Return VectorStore initialized from documents and embeddings."""embeddings=embedding.embed_documents(list(texts))returncls.__from(texts,embeddings,embedding,metadatas=metadatas,ids=ids,collection_name=collection_name,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,use_jsonb=use_jsonb,**kwargs,)
[docs]@classmethodasyncdefafrom_texts(cls:Type[PGVector],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,*,use_jsonb:bool=True,**kwargs:Any,)->PGVector:"""Return VectorStore initialized from documents and embeddings."""embeddings=awaitembedding.aembed_documents(list(texts))returnawaitcls.__afrom(texts,embeddings,embedding,metadatas=metadatas,ids=ids,collection_name=collection_name,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,use_jsonb=use_jsonb,**kwargs,)
[docs]@classmethoddeffrom_embeddings(cls,text_embeddings:List[Tuple[str,List[float]]],embedding:Embeddings,*,metadatas:Optional[List[dict]]=None,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,**kwargs:Any,)->PGVector:"""Construct PGVector wrapper from raw documents and embeddings. Args: text_embeddings: List of tuples of text and embeddings. embedding: Embeddings object. metadatas: Optional list of metadatas associated with the texts. collection_name: Name of the collection. distance_strategy: Distance strategy to use. ids: Optional list of ids for the documents. If not provided, will generate a new id for each document. pre_delete_collection: If True, will delete the collection if it exists. **Attention**: This will delete all the documents in the existing collection. kwargs: Additional arguments. Returns: PGVector: PGVector instance. Example: .. code-block:: python from langchain_postgres.vectorstores import PGVector from langchain_openai.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) vectorstore = PGVector.from_embeddings(text_embedding_pairs, embeddings) """texts=[t[0]fortintext_embeddings]embeddings=[t[1]fortintext_embeddings]returncls.__from(texts,embeddings,embedding,metadatas=metadatas,ids=ids,collection_name=collection_name,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,**kwargs,)
[docs]@classmethodasyncdefafrom_embeddings(cls,text_embeddings:List[Tuple[str,List[float]]],embedding:Embeddings,metadatas:Optional[List[dict]]=None,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,**kwargs:Any,)->PGVector:"""Construct PGVector wrapper from raw documents and pre- generated embeddings. Return VectorStore initialized from documents and embeddings. Postgres connection string is required "Either pass it as a parameter or set the PGVECTOR_CONNECTION_STRING environment variable. Example: .. code-block:: python from langchain_community.vectorstores import PGVector from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) faiss = PGVector.from_embeddings(text_embedding_pairs, embeddings) """texts=[t[0]fortintext_embeddings]embeddings=[t[1]fortintext_embeddings]returnawaitcls.__afrom(texts,embeddings,embedding,metadatas=metadatas,ids=ids,collection_name=collection_name,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,**kwargs,)
[docs]@classmethoddeffrom_existing_index(cls:Type[PGVector],embedding:Embeddings,*,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,pre_delete_collection:bool=False,connection:Optional[DBConnection]=None,**kwargs:Any,)->PGVector:""" Get instance of an existing PGVector store.This method will return the instance of the store without inserting any new embeddings """store=cls(connection=connection,collection_name=collection_name,embeddings=embedding,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,**kwargs,)returnstore
[docs]@classmethodasyncdefafrom_existing_index(cls:Type[PGVector],embedding:Embeddings,*,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,pre_delete_collection:bool=False,connection:Optional[DBConnection]=None,**kwargs:Any,)->PGVector:""" Get instance of an existing PGVector store.This method will return the instance of the store without inserting any new embeddings """store=PGVector(connection=connection,collection_name=collection_name,embeddings=embedding,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,async_mode=True,**kwargs,)returnstore
[docs]@classmethoddefget_connection_string(cls,kwargs:Dict[str,Any])->str:connection_string:str=get_from_dict_or_env(data=kwargs,key="connection",env_key="PGVECTOR_CONNECTION_STRING",)ifnotconnection_string:raiseValueError("Postgres connection string is required""Either pass it as a parameter""or set the PGVECTOR_CONNECTION_STRING environment variable.")returnconnection_string
[docs]@classmethoddeffrom_documents(cls:Type[PGVector],documents:List[Document],embedding:Embeddings,*,connection:Optional[DBConnection]=None,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,use_jsonb:bool=True,**kwargs:Any,)->PGVector:"""Return VectorStore initialized from documents and embeddings."""texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]returncls.from_texts(texts=texts,pre_delete_collection=pre_delete_collection,embedding=embedding,distance_strategy=distance_strategy,metadatas=metadatas,connection=connection,ids=ids,collection_name=collection_name,use_jsonb=use_jsonb,**kwargs,)
[docs]@classmethodasyncdefafrom_documents(cls:Type[PGVector],documents:List[Document],embedding:Embeddings,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,*,use_jsonb:bool=True,**kwargs:Any,)->PGVector:""" Return VectorStore initialized from documents and embeddings. Postgres connection string is required "Either pass it as a parameter or set the PGVECTOR_CONNECTION_STRING environment variable. """texts=[d.page_contentfordindocuments]metadatas=[d.metadatafordindocuments]connection_string=cls.get_connection_string(kwargs)kwargs["connection"]=connection_stringreturnawaitcls.afrom_texts(texts=texts,pre_delete_collection=pre_delete_collection,embedding=embedding,distance_strategy=distance_strategy,metadatas=metadatas,ids=ids,collection_name=collection_name,use_jsonb=use_jsonb,**kwargs,)
[docs]@classmethoddefconnection_string_from_db_params(cls,driver:str,host:str,port:int,database:str,user:str,password:str,)->str:"""Return connection string from database parameters."""ifdriver!="psycopg":raiseNotImplementedError("Only psycopg3 driver is supported")returnf"postgresql+{driver}://{user}:{password}@{host}:{port}/{database}"
def_select_relevance_score_fn(self)->Callable[[float],float]:""" The 'correct' relevance function may differ depending on a few things, including: - the distance / similarity metric used by the VectorStore - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - embedding dimensionality - etc. """ifself.override_relevance_score_fnisnotNone:returnself.override_relevance_score_fn# Default strategy is to rely on distance strategy provided# in vectorstore constructorifself._distance_strategy==DistanceStrategy.COSINE:returnself._cosine_relevance_score_fnelifself._distance_strategy==DistanceStrategy.EUCLIDEAN:returnself._euclidean_relevance_score_fnelifself._distance_strategy==DistanceStrategy.MAX_INNER_PRODUCT:returnself._max_inner_product_relevance_score_fnelse:raiseValueError("No supported normalization function"f" for distance_strategy of {self._distance_strategy}.""Consider providing relevance_score_fn to PGVector constructor.")
[docs]defmax_marginal_relevance_search_with_score_by_vector(self,embedding:List[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Dict[str,str]]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs selected using the maximal marginal relevance with score to embedding vector. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k (int): Number of Documents to return. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. Defaults to 20. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Tuple[Document, float]]: List of Documents selected by maximal marginal relevance to the query and score for each. """assertnotself._async_engine,"This method must be called without async_mode"results=self.__query_collection(embedding=embedding,k=fetch_k,filter=filter)embedding_list=[result.EmbeddingStore.embeddingforresultinresults]mmr_selected=maximal_marginal_relevance(np.array(embedding,dtype=np.float32),embedding_list,k=k,lambda_mult=lambda_mult,)candidates=self._results_to_docs_and_scores(results)return[rfori,rinenumerate(candidates)ifiinmmr_selected]
[docs]asyncdefamax_marginal_relevance_search_with_score_by_vector(self,embedding:List[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Dict[str,str]]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs selected using the maximal marginal relevance with score to embedding vector. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k (int): Number of Documents to return. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. Defaults to 20. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Tuple[Document, float]]: List of Documents selected by maximal marginal relevance to the query and score for each. """awaitself.__apost_init__()# Lazy async initasyncwithself._make_async_session()assession:results=awaitself.__aquery_collection(session=session,embedding=embedding,k=fetch_k,filter=filter)embedding_list=[result.EmbeddingStore.embeddingforresultinresults]mmr_selected=maximal_marginal_relevance(np.array(embedding,dtype=np.float32),embedding_list,k=k,lambda_mult=lambda_mult,)candidates=self._results_to_docs_and_scores(results)return[rfori,rinenumerate(candidates)ifiinmmr_selected]
[docs]defmax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Dict[str,str]]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query (str): Text to look up documents similar to. k (int): Number of Documents to return. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. Defaults to 20. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Document]: List of Documents selected by maximal marginal relevance. """embedding=self.embeddings.embed_query(query)returnself.max_marginal_relevance_search_by_vector(embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)
[docs]asyncdefamax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Dict[str,str]]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query (str): Text to look up documents similar to. k (int): Number of Documents to return. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. Defaults to 20. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Document]: List of Documents selected by maximal marginal relevance. """awaitself.__apost_init__()# Lazy async initembedding=awaitself.embeddings.aembed_query(query)returnawaitself.amax_marginal_relevance_search_by_vector(embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)
[docs]defmax_marginal_relevance_search_with_score(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[dict]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs selected using the maximal marginal relevance with score. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query (str): Text to look up documents similar to. k (int): Number of Documents to return. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. Defaults to 20. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Tuple[Document, float]]: List of Documents selected by maximal marginal relevance to the query and score for each. """embedding=self.embeddings.embed_query(query)docs=self.max_marginal_relevance_search_with_score_by_vector(embedding=embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)returndocs
[docs]asyncdefamax_marginal_relevance_search_with_score(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[dict]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs selected using the maximal marginal relevance with score. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query (str): Text to look up documents similar to. k (int): Number of Documents to return. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. Defaults to 20. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Tuple[Document, float]]: List of Documents selected by maximal marginal relevance to the query and score for each. """awaitself.__apost_init__()# Lazy async initembedding=awaitself.embeddings.aembed_query(query)docs=awaitself.amax_marginal_relevance_search_with_score_by_vector(embedding=embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)returndocs
[docs]defmax_marginal_relevance_search_by_vector(self,embedding:List[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Dict[str,str]]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance to embedding vector. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding (str): Text to look up documents similar to. k (int): Number of Documents to return. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. Defaults to 20. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Document]: List of Documents selected by maximal marginal relevance. """docs_and_scores=self.max_marginal_relevance_search_with_score_by_vector(embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)return_results_to_docs(docs_and_scores)
[docs]asyncdefamax_marginal_relevance_search_by_vector(self,embedding:List[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Dict[str,str]]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance to embedding vector. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding (str): Text to look up documents similar to. k (int): Number of Documents to return. Defaults to 4. fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. Defaults to 20. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List[Document]: List of Documents selected by maximal marginal relevance. """awaitself.__apost_init__()# Lazy async initdocs_and_scores=(awaitself.amax_marginal_relevance_search_with_score_by_vector(embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,))return_results_to_docs(docs_and_scores)
@contextlib.contextmanagerdef_make_sync_session(self)->Generator[Session,None,None]:"""Make an async session."""ifself.async_mode:raiseValueError("Attempting to use a sync method in when async mode is turned on. ""Please use the corresponding async method instead.")withself.session_maker()assession:yieldtyping_cast(Session,session)@contextlib.asynccontextmanagerasyncdef_make_async_session(self)->AsyncGenerator[AsyncSession,None]:"""Make an async session."""ifnotself.async_mode:raiseValueError("Attempting to use an async method in when sync mode is turned on. ""Please use the corresponding async method instead.")asyncwithself.session_maker()assession:yieldtyping_cast(AsyncSession,session)
[docs]defget_by_ids(self,ids:Sequence[str],/)->List[Document]:"""Get documents by ids."""documents=[]withself._make_sync_session()assession:collection=self.get_collection(session)filter_by=[self.EmbeddingStore.collection_id==collection.uuid]stmt=(select(self.EmbeddingStore,).where(self.EmbeddingStore.id.in_(ids)).filter(*filter_by))forresultinsession.execute(stmt).scalars().all():documents.append(Document(id=result.id,page_content=result.document,metadata=result.cmetadata,))returndocuments
[docs]asyncdefaget_by_ids(self,ids:Sequence[str],/)->List[Document]:"""Get documents by ids."""documents=[]asyncwithself._make_async_session()assession:collection=awaitself.aget_collection(session)filter_by=[self.EmbeddingStore.collection_id==collection.uuid]stmt=(select(self.EmbeddingStore,).where(self.EmbeddingStore.id.in_(ids)).filter(*filter_by))results:Sequence[Any]=(awaitsession.execute(stmt)).scalars().all()forresultinresults:documents.append(Document(id=str(result.id),page_content=result.document,metadata=result.cmetadata,))returndocuments