from __future__ import annotations
import logging
import uuid
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union, cast
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils.iter import batch_iterate
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import (
maximal_marginal_relevance,
)
if TYPE_CHECKING:
from upstash_vector import AsyncIndex, Index
from upstash_vector.types import InfoResult
logger = logging.getLogger(__name__)
[docs]class UpstashVectorStore(VectorStore):
"""Upstash Vector vector store
To use, the ``upstash-vector`` python package must be installed.
Also an Upstash Vector index is required. First create a new Upstash Vector index
and copy the `index_url` and `index_token` variables. Then either pass
them through the constructor or set the environment
variables `UPSTASH_VECTOR_REST_URL` and `UPSTASH_VECTOR_REST_TOKEN`.
Example:
.. code-block:: python
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import UpstashVectorStore
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = UpstashVectorStore(
embedding=embeddings,
index_url="...",
index_token="..."
)
# or
import os
os.environ["UPSTASH_VECTOR_REST_URL"] = "..."
os.environ["UPSTASH_VECTOR_REST_TOKEN"] = "..."
vectorstore = UpstashVectorStore(
embedding=embeddings
)
"""
[docs] def __init__(
self,
text_key: str = "text",
index: Optional[Index] = None,
async_index: Optional[AsyncIndex] = None,
index_url: Optional[str] = None,
index_token: Optional[str] = None,
embedding: Optional[Union[Embeddings, bool]] = None,
*,
namespace: str = "",
):
"""
Constructor for UpstashVectorStore.
If index or index_url and index_token are not provided, the constructor will
attempt to create an index using the environment variables
`UPSTASH_VECTOR_REST_URL`and `UPSTASH_VECTOR_REST_TOKEN`.
Args:
text_key: Key to store the text in metadata.
index: UpstashVector Index object.
async_index: UpstashVector AsyncIndex object, provide only if async
functions are needed
index_url: URL of the UpstashVector index.
index_token: Token of the UpstashVector index.
embedding: Embeddings object or a boolean. When false, no embedding
is applied. If true, Upstash embeddings are used. When Upstash
embeddings are used, text is sent directly to Upstash and
embedding is applied there instead of embedding in Langchain.
namespace: Namespace to use from the index.
Example:
.. code-block:: python
from langchain_community.vectorstores.upstash import UpstashVectorStore
from langchain_community.embeddings.openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = UpstashVectorStore(
embedding=embeddings,
index_url="...",
index_token="...",
namespace="..."
)
# With an existing index
from upstash_vector import Index
index = Index(url="...", token="...")
vectorstore = UpstashVectorStore(
embedding=embeddings,
index=index,
namespace="..."
)
"""
try:
from upstash_vector import AsyncIndex, Index
except ImportError:
raise ImportError(
"Could not import upstash_vector python package. "
"Please install it with `pip install upstash_vector`."
)
if index:
if not isinstance(index, Index):
raise ValueError(
"Passed index object should be an "
"instance of upstash_vector.Index, "
f"got {type(index)}"
)
self._index = index
logger.info("Using the index passed as parameter")
if async_index:
if not isinstance(async_index, AsyncIndex):
raise ValueError(
"Passed index object should be an "
"instance of upstash_vector.AsyncIndex, "
f"got {type(async_index)}"
)
self._async_index = async_index
logger.info("Using the async index passed as parameter")
if index_url and index_token:
self._index = Index(url=index_url, token=index_token)
self._async_index = AsyncIndex(url=index_url, token=index_token)
logger.info("Created index from the index_url and index_token parameters")
elif not index and not async_index:
self._index = Index.from_env()
self._async_index = AsyncIndex.from_env()
logger.info("Created index using environment variables")
self._embeddings = embedding
self._text_key = text_key
self._namespace = namespace
@property
def embeddings(self) -> Optional[Union[Embeddings, bool]]: # type: ignore
"""Access the query embedding object if available."""
return self._embeddings
def _embed_documents(
self, texts: Iterable[str]
) -> Union[List[List[float]], List[str]]:
"""Embed strings using the embeddings object"""
if not self._embeddings:
raise ValueError(
"No embeddings object provided. "
"Pass an embeddings object to the constructor."
)
if isinstance(self._embeddings, Embeddings):
return self._embeddings.embed_documents(list(texts))
# using self._embeddings is True, Upstash embeddings will be used.
# returning list of text as List[str]
return list(texts)
def _embed_query(self, text: str) -> Union[List[float], str]:
"""Embed query text using the embeddings object."""
if not self._embeddings:
raise ValueError(
"No embeddings object provided. "
"Pass an embeddings object to the constructor."
)
if isinstance(self._embeddings, Embeddings):
return self._embeddings.embed_query(text)
# using self._embeddings is True, Upstash embeddings will be used.
# returning query as it is
return text
[docs] def add_documents(
self,
documents: List[Document],
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[str]:
"""
Get the embeddings for the documents and add them to the vectorstore.
Documents are sent to the embeddings object
in batches of size `embedding_chunk_size`.
The embeddings are then upserted into the vectorstore
in batches of size `batch_size`.
Args:
documents: Iterable of Documents to add to the vectorstore.
batch_size: Batch size to use when upserting the embeddings.
Upstash supports at max 1000 vectors per request.
embedding_batch_size: Chunk size to use when embedding the texts.
namespace: Namespace to use from the index.
Returns:
List of ids from adding the texts into the vectorstore.
"""
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return self.add_texts(
texts,
metadatas=metadatas,
batch_size=batch_size,
ids=ids,
embedding_chunk_size=embedding_chunk_size,
namespace=namespace,
**kwargs,
)
[docs] async def aadd_documents(
self,
documents: Iterable[Document],
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[str]:
"""
Get the embeddings for the documents and add them to the vectorstore.
Documents are sent to the embeddings object
in batches of size `embedding_chunk_size`.
The embeddings are then upserted into the vectorstore
in batches of size `batch_size`.
Args:
documents: Iterable of Documents to add to the vectorstore.
batch_size: Batch size to use when upserting the embeddings.
Upstash supports at max 1000 vectors per request.
embedding_batch_size: Chunk size to use when embedding the texts.
namespace: Namespace to use from the index.
Returns:
List of ids from adding the texts into the vectorstore.
"""
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return await self.aadd_texts(
texts,
metadatas=metadatas,
ids=ids,
batch_size=batch_size,
embedding_chunk_size=embedding_chunk_size,
namespace=namespace,
**kwargs,
)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[str]:
"""
Get the embeddings for the texts and add them to the vectorstore.
Texts are sent to the embeddings object
in batches of size `embedding_chunk_size`.
The embeddings are then upserted into the vectorstore
in batches of size `batch_size`.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids to associate with the texts.
batch_size: Batch size to use when upserting the embeddings.
Upstash supports at max 1000 vectors per request.
embedding_batch_size: Chunk size to use when embedding the texts.
namespace: Namespace to use from the index.
Returns:
List of ids from adding the texts into the vectorstore.
"""
if namespace is None:
namespace = self._namespace
texts = list(texts)
ids = ids or [str(uuid.uuid4()) for _ in texts]
# Copy metadatas to avoid modifying the original documents
if metadatas:
metadatas = [m.copy() for m in metadatas]
else:
metadatas = [{} for _ in texts]
# Add text to metadata
for metadata, text in zip(metadatas, texts):
metadata[self._text_key] = text
for i in range(0, len(texts), embedding_chunk_size):
chunk_texts = texts[i : i + embedding_chunk_size]
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embed_documents(chunk_texts)
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
):
self._index.upsert(
vectors=batch, namespace=cast(str, namespace), **kwargs
)
return ids
[docs] async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[str]:
"""
Get the embeddings for the texts and add them to the vectorstore.
Texts are sent to the embeddings object
in batches of size `embedding_chunk_size`.
The embeddings are then upserted into the vectorstore
in batches of size `batch_size`.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids to associate with the texts.
batch_size: Batch size to use when upserting the embeddings.
Upstash supports at max 1000 vectors per request.
embedding_batch_size: Chunk size to use when embedding the texts.
namespace: Namespace to use from the index.
Returns:
List of ids from adding the texts into the vectorstore.
"""
if namespace is None:
namespace = self._namespace
texts = list(texts)
ids = ids or [str(uuid.uuid4()) for _ in texts]
# Copy metadatas to avoid modifying the original documents
if metadatas:
metadatas = [m.copy() for m in metadatas]
else:
metadatas = [{} for _ in texts]
# Add text to metadata
for metadata, text in zip(metadatas, texts):
metadata[self._text_key] = text
for i in range(0, len(texts), embedding_chunk_size):
chunk_texts = texts[i : i + embedding_chunk_size]
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embed_documents(chunk_texts)
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
):
await self._async_index.upsert(
vectors=batch, namespace=cast(str, namespace), **kwargs
)
return ids
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Retrieve texts most similar to query and
convert the result to `Document` objects.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query and score for each
"""
return self.similarity_search_by_vector_with_score(
self._embed_query(query), k=k, filter=filter, namespace=namespace, **kwargs
)
[docs] async def asimilarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Retrieve texts most similar to query and
convert the result to `Document` objects.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query and score for each
"""
return await self.asimilarity_search_by_vector_with_score(
self._embed_query(query), k=k, filter=filter, namespace=namespace, **kwargs
)
def _process_results(self, results: List) -> List[Tuple[Document, float]]:
docs = []
for res in results:
metadata = res.metadata
if metadata and self._text_key in metadata:
text = metadata.pop(self._text_key)
doc = Document(page_content=text, metadata=metadata)
docs.append((doc, res.score))
else:
logger.warning(
f"Found document with no `{self._text_key}` key. Skipping."
)
return docs
[docs] def similarity_search_by_vector_with_score(
self,
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return texts whose embedding is closest to the given embedding"""
filter = filter or ""
if namespace is None:
namespace = self._namespace
if isinstance(embedding, str):
results = self._index.query(
data=embedding,
top_k=k,
include_metadata=True,
filter=filter,
namespace=namespace,
**kwargs,
)
else:
results = self._index.query(
vector=embedding,
top_k=k,
include_metadata=True,
filter=filter,
namespace=namespace,
**kwargs,
)
return self._process_results(results)
[docs] async def asimilarity_search_by_vector_with_score(
self,
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return texts whose embedding is closest to the given embedding"""
filter = filter or ""
if namespace is None:
namespace = self._namespace
if isinstance(embedding, str):
results = await self._async_index.query(
data=embedding,
top_k=k,
include_metadata=True,
filter=filter,
namespace=namespace,
**kwargs,
)
else:
results = await self._async_index.query(
vector=embedding,
top_k=k,
include_metadata=True,
filter=filter,
namespace=namespace,
**kwargs,
)
return self._process_results(results)
[docs] def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return documents most similar to query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query and score for each
"""
docs_and_scores = self.similarity_search_with_score(
query, k=k, filter=filter, namespace=namespace, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return documents most similar to query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query
"""
docs_and_scores = await self.asimilarity_search_with_score(
query, k=k, filter=filter, namespace=namespace, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] def similarity_search_by_vector(
self,
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return documents closest to the given embedding.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query
"""
docs_and_scores = self.similarity_search_by_vector_with_score(
embedding, k=k, filter=filter, namespace=namespace, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search_by_vector(
self,
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return documents closest to the given embedding.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query
"""
docs_and_scores = await self.asimilarity_search_by_vector_with_score(
embedding, k=k, filter=filter, namespace=namespace, **kwargs
)
return [doc for doc, _ in docs_and_scores]
def _similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
Since Upstash always returns relevance scores, default implementation is used.
"""
return self.similarity_search_with_score(
query, k=k, filter=filter, namespace=namespace, **kwargs
)
async def _asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
Since Upstash always returns relevance scores, default implementation is used.
"""
return await self.asimilarity_search_with_score(
query, k=k, filter=filter, namespace=namespace, **kwargs
)
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: Union[List[float], str],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents selected by maximal marginal relevance.
"""
if namespace is None:
namespace = self._namespace
assert isinstance(self.embeddings, Embeddings)
if isinstance(embedding, str):
results = self._index.query(
data=embedding,
top_k=fetch_k,
include_vectors=True,
include_metadata=True,
filter=filter or "",
namespace=namespace,
**kwargs,
)
else:
results = self._index.query(
vector=embedding,
top_k=fetch_k,
include_vectors=True,
include_metadata=True,
filter=filter or "",
namespace=namespace,
**kwargs,
)
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
[item.vector for item in results],
k=k,
lambda_mult=lambda_mult,
)
selected = [results[i].metadata for i in mmr_selected]
return [
Document(page_content=metadata.pop((self._text_key)), metadata=metadata) # type: ignore
for metadata in selected
]
[docs] async def amax_marginal_relevance_search_by_vector(
self,
embedding: Union[List[float], str],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents selected by maximal marginal relevance.
"""
if namespace is None:
namespace = self._namespace
assert isinstance(self.embeddings, Embeddings)
if isinstance(embedding, str):
results = await self._async_index.query(
data=embedding,
top_k=fetch_k,
include_vectors=True,
include_metadata=True,
filter=filter or "",
namespace=namespace,
**kwargs,
)
else:
results = await self._async_index.query(
vector=embedding,
top_k=fetch_k,
include_vectors=True,
include_metadata=True,
filter=filter or "",
namespace=namespace,
**kwargs,
)
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
[item.vector for item in results],
k=k,
lambda_mult=lambda_mult,
)
selected = [results[i].metadata for i in mmr_selected]
return [
Document(page_content=metadata.pop((self._text_key)), metadata=metadata) # type: ignore
for metadata in selected
]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents selected by maximal marginal relevance.
"""
embedding = self._embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding=embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
namespace=namespace,
**kwargs,
)
[docs] async def amax_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents selected by maximal marginal relevance.
"""
embedding = self._embed_query(query)
return await self.amax_marginal_relevance_search_by_vector(
embedding=embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
namespace=namespace,
**kwargs,
)
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
embedding_chunk_size: int = 1000,
batch_size: int = 32,
text_key: str = "text",
index: Optional[Index] = None,
async_index: Optional[AsyncIndex] = None,
index_url: Optional[str] = None,
index_token: Optional[str] = None,
*,
namespace: str = "",
**kwargs: Any,
) -> UpstashVectorStore:
"""Create a new UpstashVectorStore from a list of texts.
Example:
.. code-block:: python
from langchain_community.vectorstores.upstash import UpstashVectorStore
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vector_store = UpstashVectorStore.from_texts(
texts,
embeddings,
)
"""
vector_store = cls(
embedding=embedding,
text_key=text_key,
index=index,
async_index=async_index,
index_url=index_url,
index_token=index_token,
namespace=namespace,
**kwargs,
)
vector_store.add_texts(
texts,
metadatas=metadatas,
ids=ids,
batch_size=batch_size,
embedding_chunk_size=embedding_chunk_size,
namespace=namespace,
)
return vector_store
[docs] @classmethod
async def afrom_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
embedding_chunk_size: int = 1000,
batch_size: int = 32,
text_key: str = "text",
index: Optional[Index] = None,
async_index: Optional[AsyncIndex] = None,
index_url: Optional[str] = None,
index_token: Optional[str] = None,
*,
namespace: str = "",
**kwargs: Any,
) -> UpstashVectorStore:
"""Create a new UpstashVectorStore from a list of texts.
Example:
.. code-block:: python
from langchain_community.vectorstores.upstash import UpstashVectorStore
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vector_store = UpstashVectorStore.from_texts(
texts,
embeddings,
)
"""
vector_store = cls(
embedding=embedding,
text_key=text_key,
index=index,
async_index=async_index,
namespace=namespace,
index_url=index_url,
index_token=index_token,
**kwargs,
)
await vector_store.aadd_texts(
texts,
metadatas=metadatas,
ids=ids,
batch_size=batch_size,
namespace=namespace,
embedding_chunk_size=embedding_chunk_size,
)
return vector_store
[docs] def delete(
self,
ids: Optional[List[str]] = None,
delete_all: Optional[bool] = None,
batch_size: Optional[int] = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> None:
"""Delete by vector IDs
Args:
ids: List of ids to delete.
delete_all: Delete all vectors in the index.
batch_size: Batch size to use when deleting the embeddings.
namespace: Namespace to use from the index.
Upstash supports at max 1000 deletions per request.
"""
if namespace is None:
namespace = self._namespace
if delete_all:
self._index.reset(namespace=namespace)
elif ids is not None:
for batch in batch_iterate(batch_size, ids):
self._index.delete(ids=batch, namespace=namespace)
else:
raise ValueError("Either ids or delete_all should be provided")
return None
[docs] async def adelete(
self,
ids: Optional[List[str]] = None,
delete_all: Optional[bool] = None,
batch_size: Optional[int] = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> None:
"""Delete by vector IDs
Args:
ids: List of ids to delete.
delete_all: Delete all vectors in the index.
batch_size: Batch size to use when deleting the embeddings.
namespace: Namespace to use from the index.
Upstash supports at max 1000 deletions per request.
"""
if namespace is None:
namespace = self._namespace
if delete_all:
await self._async_index.reset(namespace=namespace)
elif ids is not None:
for batch in batch_iterate(batch_size, ids):
await self._async_index.delete(ids=batch, namespace=namespace)
else:
raise ValueError("Either ids or delete_all should be provided")
return None
[docs] def info(self) -> InfoResult:
"""Get statistics about the index.
Returns:
- total number of vectors
- total number of vectors waiting to be indexed
- total size of the index on disk in bytes
- dimension count for the index
- similarity function selected for the index
"""
return self._index.info()
[docs] async def ainfo(self) -> InfoResult:
"""Get statistics about the index.
Returns:
- total number of vectors
- total number of vectors waiting to be indexed
- total size of the index on disk in bytes
- dimension count for the index
- similarity function selected for the index
"""
return await self._async_index.info()