Source code for langchain_community.vectorstores.timescalevector
"""VectorStore wrapper around a Postgres-TimescaleVector database."""from__future__importannotationsimportenumimportloggingimportuuidfromdatetimeimporttimedeltafromtypingimport(TYPE_CHECKING,Any,Callable,Dict,Iterable,List,Optional,Tuple,Type,Union,)fromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.utilsimportget_from_dict_or_envfromlangchain_core.vectorstoresimportVectorStorefromlangchain_community.vectorstores.utilsimportDistanceStrategyifTYPE_CHECKING:fromtimescale_vectorimportPredicatesDEFAULT_DISTANCE_STRATEGY=DistanceStrategy.COSINEADA_TOKEN_COUNT=1536_LANGCHAIN_DEFAULT_COLLECTION_NAME="langchain_store"
[docs]classTimescaleVector(VectorStore):"""Timescale Postgres vector store To use, you should have the ``timescale_vector`` python package installed. Args: service_url: Service url on timescale cloud. embedding: Any embedding function implementing `langchain.embeddings.base.Embeddings` interface. collection_name: The name of the collection to use. (default: langchain_store) This will become the table name used for the collection. distance_strategy: The distance strategy to use. (default: COSINE) pre_delete_collection: If True, will delete the collection if it exists. (default: False). Useful for testing. Example: .. code-block:: python from langchain_community.vectorstores import TimescaleVector from langchain_community.embeddings.openai import OpenAIEmbeddings SERVICE_URL = "postgres://tsdbadmin:<password>@<id>.tsdb.cloud.timescale.com:<port>/tsdb?sslmode=require" COLLECTION_NAME = "state_of_the_union_test" embeddings = OpenAIEmbeddings() vectorestore = TimescaleVector.from_documents( embedding=embeddings, documents=docs, collection_name=COLLECTION_NAME, service_url=SERVICE_URL, ) """
[docs]def__init__(self,service_url:str,embedding:Embeddings,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,num_dimensions:int=ADA_TOKEN_COUNT,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,pre_delete_collection:bool=False,logger:Optional[logging.Logger]=None,relevance_score_fn:Optional[Callable[[float],float]]=None,time_partition_interval:Optional[timedelta]=None,**kwargs:Any,)->None:try:fromtimescale_vectorimportclientexceptImportError:raiseImportError("Could not import timescale_vector python package. ""Please install it with `pip install timescale-vector`.")self.service_url=service_urlself.embedding=embeddingself.collection_name=collection_nameself.num_dimensions=num_dimensionsself._distance_strategy=distance_strategyself.pre_delete_collection=pre_delete_collectionself.logger=loggerorlogging.getLogger(__name__)self.override_relevance_score_fn=relevance_score_fnself._time_partition_interval=time_partition_intervalself.sync_client=client.Sync(self.service_url,self.collection_name,self.num_dimensions,self._distance_strategy.value.lower(),time_partition_interval=self._time_partition_interval,**kwargs,)self.async_client=client.Async(self.service_url,self.collection_name,self.num_dimensions,self._distance_strategy.value.lower(),time_partition_interval=self._time_partition_interval,**kwargs,)self.__post_init__()
def__post_init__(self,)->None:""" Initialize the store. """self.sync_client.create_tables()ifself.pre_delete_collection:self.sync_client.delete_all()@propertydefembeddings(self)->Embeddings:returnself.embedding
[docs]defadd_embeddings(self,texts:Iterable[str],embeddings:List[List[float]],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Add embeddings to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. embeddings: List of list of embedding vectors. metadatas: List of metadatas associated with the texts. kwargs: vectorstore specific parameters """ifidsisNone:ids=[str(uuid.uuid4())for_intexts]ifnotmetadatas:metadatas=[{}for_intexts]records=list(zip(ids,metadatas,texts,embeddings))self.sync_client.upsert(records)returnids
[docs]asyncdefaadd_embeddings(self,texts:Iterable[str],embeddings:List[List[float]],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Add embeddings to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. embeddings: List of list of embedding vectors. metadatas: List of metadatas associated with the texts. kwargs: vectorstore specific parameters """ifidsisNone:ids=[str(uuid.uuid4())for_intexts]ifnotmetadatas:metadatas=[{}for_intexts]records=list(zip(ids,metadatas,texts,embeddings))awaitself.async_client.upsert(records)returnids
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. kwargs: vectorstore specific parameters Returns: List of ids from adding the texts into the vectorstore. """embeddings=self.embedding.embed_documents(list(texts))returnself.add_embeddings(texts=texts,embeddings=embeddings,metadatas=metadatas,ids=ids,**kwargs)
[docs]asyncdefaadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. kwargs: vectorstore specific parameters Returns: List of ids from adding the texts into the vectorstore. """embeddings=self.embedding.embed_documents(list(texts))returnawaitself.aadd_embeddings(texts=texts,embeddings=embeddings,metadatas=metadatas,ids=ids,**kwargs)
def_embed_query(self,query:str)->Optional[List[float]]:# an empty query should not be embeddedifqueryisNoneorquery==""orquery.isspace():returnNoneelse:returnself.embedding.embed_query(query)
[docs]defsimilarity_search(self,query:str,k:int=4,filter:Optional[Union[dict,list]]=None,predicates:Optional[Predicates]=None,**kwargs:Any,)->List[Document]:"""Run similarity search with TimescaleVector with distance. Args: query (str): Query text to search for. k (int): Number of results to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query. """embedding=self._embed_query(query)returnself.similarity_search_by_vector(embedding=embedding,k=k,filter=filter,predicates=predicates,**kwargs,)
[docs]asyncdefasimilarity_search(self,query:str,k:int=4,filter:Optional[Union[dict,list]]=None,predicates:Optional[Predicates]=None,**kwargs:Any,)->List[Document]:"""Run similarity search with TimescaleVector with distance. Args: query (str): Query text to search for. k (int): Number of results to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query. """embedding=self._embed_query(query)returnawaitself.asimilarity_search_by_vector(embedding=embedding,k=k,filter=filter,predicates=predicates,**kwargs,)
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,filter:Optional[Union[dict,list]]=None,predicates:Optional[Predicates]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query and score for each """embedding=self._embed_query(query)docs=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter,predicates=predicates,**kwargs,)returndocs
[docs]asyncdefasimilarity_search_with_score(self,query:str,k:int=4,filter:Optional[Union[dict,list]]=None,predicates:Optional[Predicates]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query and score for each """embedding=self._embed_query(query)returnawaitself.asimilarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter,predicates=predicates,**kwargs,)
[docs]defdate_to_range_filter(self,**kwargs:Any)->Any:constructor_args={key:kwargs[key]forkeyin["start_date","end_date","time_delta","start_inclusive","end_inclusive",]ifkeyinkwargs}ifnotconstructor_argsorlen(constructor_args)==0:returnNonetry:fromtimescale_vectorimportclientexceptImportError:raiseImportError("Could not import timescale_vector python package. ""Please install it with `pip install timescale-vector`.")returnclient.UUIDTimeRange(**constructor_args)
[docs]defsimilarity_search_with_score_by_vector(self,embedding:Optional[List[float]],k:int=4,filter:Optional[Union[dict,list]]=None,predicates:Optional[Predicates]=None,**kwargs:Any,)->List[Tuple[Document,float]]:try:fromtimescale_vectorimportclientexceptImportError:raiseImportError("Could not import timescale_vector python package. ""Please install it with `pip install timescale-vector`.")results=self.sync_client.search(embedding,limit=k,filter=filter,predicates=predicates,uuid_time_filter=self.date_to_range_filter(**kwargs),)docs=[(Document(page_content=result[client.SEARCH_RESULT_CONTENTS_IDX],metadata=result[client.SEARCH_RESULT_METADATA_IDX],),result[client.SEARCH_RESULT_DISTANCE_IDX],)forresultinresults]returndocs
[docs]asyncdefasimilarity_search_with_score_by_vector(self,embedding:Optional[List[float]],k:int=4,filter:Optional[Union[dict,list]]=None,predicates:Optional[Predicates]=None,**kwargs:Any,)->List[Tuple[Document,float]]:try:fromtimescale_vectorimportclientexceptImportError:raiseImportError("Could not import timescale_vector python package. ""Please install it with `pip install timescale-vector`.")results=awaitself.async_client.search(embedding,limit=k,filter=filter,predicates=predicates,uuid_time_filter=self.date_to_range_filter(**kwargs),)docs=[(Document(page_content=result[client.SEARCH_RESULT_CONTENTS_IDX],metadata=result[client.SEARCH_RESULT_METADATA_IDX],),result[client.SEARCH_RESULT_DISTANCE_IDX],)forresultinresults]returndocs
[docs]defsimilarity_search_by_vector(self,embedding:Optional[List[float]],k:int=4,filter:Optional[Union[dict,list]]=None,predicates:Optional[Predicates]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query vector. """docs_and_scores=self.similarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter,predicates=predicates,**kwargs)return[docfordoc,_indocs_and_scores]
[docs]asyncdefasimilarity_search_by_vector(self,embedding:Optional[List[float]],k:int=4,filter:Optional[Union[dict,list]]=None,predicates:Optional[Predicates]=None,**kwargs:Any,)->List[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. Returns: List of Documents most similar to the query vector. """docs_and_scores=awaitself.asimilarity_search_with_score_by_vector(embedding=embedding,k=k,filter=filter,predicates=predicates,**kwargs)return[docfordoc,_indocs_and_scores]
[docs]@classmethoddeffrom_texts(cls:Type[TimescaleVector],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,**kwargs:Any,)->TimescaleVector:""" Return VectorStore initialized from texts and embeddings. Postgres connection string is required "Either pass it as a parameter or set the TIMESCALE_SERVICE_URL environment variable. """embeddings=embedding.embed_documents(list(texts))returncls.__from(texts,embeddings,embedding,metadatas=metadatas,ids=ids,collection_name=collection_name,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,**kwargs,)
[docs]@classmethodasyncdefafrom_texts(cls:Type[TimescaleVector],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,**kwargs:Any,)->TimescaleVector:""" Return VectorStore initialized from texts and embeddings. Postgres connection string is required "Either pass it as a parameter or set the TIMESCALE_SERVICE_URL environment variable. """embeddings=embedding.embed_documents(list(texts))returnawaitcls.__afrom(texts,embeddings,embedding,metadatas=metadatas,ids=ids,collection_name=collection_name,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,**kwargs,)
[docs]@classmethoddeffrom_embeddings(cls,text_embeddings:List[Tuple[str,List[float]]],embedding:Embeddings,metadatas:Optional[List[dict]]=None,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,**kwargs:Any,)->TimescaleVector:"""Construct TimescaleVector wrapper from raw documents and pre- generated embeddings. Return VectorStore initialized from documents and embeddings. Postgres connection string is required "Either pass it as a parameter or set the TIMESCALE_SERVICE_URL environment variable. Example: .. code-block:: python from langchain_community.vectorstores import TimescaleVector from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) tvs = TimescaleVector.from_embeddings(text_embedding_pairs, embeddings) """texts=[t[0]fortintext_embeddings]embeddings=[t[1]fortintext_embeddings]returncls.__from(texts,embeddings,embedding,metadatas=metadatas,ids=ids,collection_name=collection_name,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,**kwargs,)
[docs]@classmethodasyncdefafrom_embeddings(cls,text_embeddings:List[Tuple[str,List[float]]],embedding:Embeddings,metadatas:Optional[List[dict]]=None,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,pre_delete_collection:bool=False,**kwargs:Any,)->TimescaleVector:"""Construct TimescaleVector wrapper from raw documents and pre- generated embeddings. Return VectorStore initialized from documents and embeddings. Postgres connection string is required "Either pass it as a parameter or set the TIMESCALE_SERVICE_URL environment variable. Example: .. code-block:: python from langchain_community.vectorstores import TimescaleVector from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) tvs = TimescaleVector.from_embeddings(text_embedding_pairs, embeddings) """texts=[t[0]fortintext_embeddings]embeddings=[t[1]fortintext_embeddings]returnawaitcls.__afrom(texts,embeddings,embedding,metadatas=metadatas,ids=ids,collection_name=collection_name,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,**kwargs,)
[docs]@classmethoddeffrom_existing_index(cls:Type[TimescaleVector],embedding:Embeddings,collection_name:str=_LANGCHAIN_DEFAULT_COLLECTION_NAME,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,pre_delete_collection:bool=False,**kwargs:Any,)->TimescaleVector:""" Get instance of an existing TimescaleVector store.This method will return the instance of the store without inserting any new embeddings """service_url=cls.get_service_url(kwargs)store=cls(service_url=service_url,collection_name=collection_name,embedding=embedding,distance_strategy=distance_strategy,pre_delete_collection=pre_delete_collection,)returnstore
[docs]@classmethoddefget_service_url(cls,kwargs:Dict[str,Any])->str:service_url:str=get_from_dict_or_env(data=kwargs,key="service_url",env_key="TIMESCALE_SERVICE_URL",)ifnotservice_url:raiseValueError("Postgres connection string is required""Either pass it as a parameter""or set the TIMESCALE_SERVICE_URL environment variable.")returnservice_url
[docs]@classmethoddefservice_url_from_db_params(cls,host:str,port:int,database:str,user:str,password:str,)->str:"""Return connection string from database parameters."""returnf"postgresql://{user}:{password}@{host}:{port}/{database}"
def_select_relevance_score_fn(self)->Callable[[float],float]:""" The 'correct' relevance function may differ depending on a few things, including: - the distance / similarity metric used by the VectorStore - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - embedding dimensionality - etc. """ifself.override_relevance_score_fnisnotNone:returnself.override_relevance_score_fn# Default strategy is to rely on distance strategy provided# in vectorstore constructorifself._distance_strategy==DistanceStrategy.COSINE:returnself._cosine_relevance_score_fnelifself._distance_strategy==DistanceStrategy.EUCLIDEAN_DISTANCE:returnself._euclidean_relevance_score_fnelifself._distance_strategy==DistanceStrategy.MAX_INNER_PRODUCT:returnself._max_inner_product_relevance_score_fnelse:raiseValueError("No supported normalization function"f" for distance_strategy of {self._distance_strategy}.""Consider providing relevance_score_fn to TimescaleVector constructor.")
[docs]defdelete(self,ids:Optional[List[str]]=None,**kwargs:Any)->Optional[bool]:"""Delete by vector ID or other criteria. Args: ids: List of ids to delete. **kwargs: Other keyword arguments that subclasses might use. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """ifidsisNone:raiseValueError("No ids provided to delete.")self.sync_client.delete_by_ids(ids)returnTrue
# todo should this be part of delete|()?
[docs]defdelete_by_metadata(self,filter:Union[Dict[str,str],List[Dict[str,str]]],**kwargs:Any)->Optional[bool]:"""Delete by vector ID or other criteria. Args: ids: List of ids to delete. **kwargs: Other keyword arguments that subclasses might use. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """self.sync_client.delete_by_metadata(filter)returnTrue
classIndexType(str,enum.Enum):"""Enumerator for the supported Index types"""TIMESCALE_VECTOR="tsv"PGVECTOR_IVFFLAT="ivfflat"PGVECTOR_HNSW="hnsw"DEFAULT_INDEX_TYPE=IndexType.TIMESCALE_VECTOR
[docs]defcreate_index(self,index_type:Union[IndexType,str]=DEFAULT_INDEX_TYPE,**kwargs:Any)->None:try:fromtimescale_vectorimportclientexceptImportError:raiseImportError("Could not import timescale_vector python package. ""Please install it with `pip install timescale-vector`.")index_type=(index_type.valueifisinstance(index_type,self.IndexType)elseindex_type)ifindex_type==self.IndexType.PGVECTOR_IVFFLAT.value:self.sync_client.create_embedding_index(client.IvfflatIndex(**kwargs))ifindex_type==self.IndexType.PGVECTOR_HNSW.value:self.sync_client.create_embedding_index(client.HNSWIndex(**kwargs))ifindex_type==self.IndexType.TIMESCALE_VECTOR.value:self.sync_client.create_embedding_index(client.TimescaleVectorIndex(**kwargs))