from __future__ import annotations
import json
import uuid
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterator,
List,
Optional,
Sequence,
Tuple,
)
from langchain_core._api import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.load import dumpd, load
from langchain_core.vectorstores import VectorStore
from langchain_core.vectorstores.utils import _cosine_similarity as cosine_similarity
from langchain_core.vectorstores.utils import maximal_marginal_relevance
if TYPE_CHECKING:
from langchain_core.indexing import UpsertResponse
[docs]class InMemoryVectorStore(VectorStore):
"""In-memory vector store implementation.
Uses a dictionary, and computes cosine similarity for search using numpy.
Setup:
Install ``langchain-core``.
.. code-block:: bash
pip install -U langchain-core
Key init args — indexing params:
embedding_function: Embeddings
Embedding function to use.
Instantiate:
.. code-block:: python
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
vector_store = InMemoryVectorStore(OpenAIEmbeddings())
Add Documents:
.. code-block:: python
from langchain_core.documents import Document
document_1 = Document(id="1", page_content="foo", metadata={"baz": "bar"})
document_2 = Document(id="2", page_content="thud", metadata={"bar": "baz"})
document_3 = Document(id="3", page_content="i will be deleted :(")
documents = [document_1, document_2, document_3]
vector_store.add_documents(documents=documents)
Delete Documents:
.. code-block:: python
vector_store.delete(ids=["3"])
Search:
.. code-block:: python
results = vector_store.similarity_search(query="thud",k=1)
for doc in results:
print(f"* {doc.page_content} [{doc.metadata}]")
.. code-block:: none
* thud [{'bar': 'baz'}]
Search with filter:
.. code-block:: python
def _filter_function(doc: Document) -> bool:
return doc.metadata.get("bar") == "baz"
results = vector_store.similarity_search(
query="thud", k=1, filter=_filter_function
)
for doc in results:
print(f"* {doc.page_content} [{doc.metadata}]")
.. code-block:: none
* thud [{'bar': 'baz'}]
Search with score:
.. code-block:: python
results = vector_store.similarity_search_with_score(
query="qux", k=1
)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
.. code-block:: none
* [SIM=0.832268] foo [{'baz': 'bar'}]
Async:
.. code-block:: python
# add documents
# await vector_store.aadd_documents(documents=documents)
# delete documents
# await vector_store.adelete(ids=["3"])
# search
# results = vector_store.asimilarity_search(query="thud", k=1)
# search with score
results = await vector_store.asimilarity_search_with_score(query="qux", k=1)
for doc,score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
.. code-block:: none
* [SIM=0.832268] foo [{'baz': 'bar'}]
Use as Retriever:
.. code-block:: python
retriever = vector_store.as_retriever(
search_type="mmr",
search_kwargs={"k": 1, "fetch_k": 2, "lambda_mult": 0.5},
)
retriever.invoke("thud")
.. code-block:: none
[Document(id='2', metadata={'bar': 'baz'}, page_content='thud')]
""" # noqa: E501
[docs] def __init__(self, embedding: Embeddings) -> None:
"""Initialize with the given embedding function.
Args:
embedding: embedding function to use.
"""
# TODO: would be nice to change to
# Dict[str, Document] at some point (will be a breaking change)
self.store: Dict[str, Dict[str, Any]] = {}
self.embedding = embedding
@property
def embeddings(self) -> Embeddings:
return self.embedding
[docs] def delete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None:
if ids:
for _id in ids:
self.store.pop(_id, None)
[docs] async def adelete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None:
self.delete(ids)
[docs] def add_documents(
self,
documents: List[Document],
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Add documents to the store."""
texts = [doc.page_content for doc in documents]
vectors = self.embedding.embed_documents(texts)
if ids and len(ids) != len(texts):
raise ValueError(
f"ids must be the same length as texts. "
f"Got {len(ids)} ids and {len(texts)} texts."
)
id_iterator: Iterator[Optional[str]] = (
iter(ids) if ids else iter(doc.id for doc in documents)
)
ids_ = []
for doc, vector in zip(documents, vectors):
doc_id = next(id_iterator)
doc_id_ = doc_id if doc_id else str(uuid.uuid4())
ids_.append(doc_id_)
self.store[doc_id_] = {
"id": doc_id_,
"vector": vector,
"text": doc.page_content,
"metadata": doc.metadata,
}
return ids_
[docs] async def aadd_documents(
self, documents: List[Document], ids: Optional[List[str]] = None, **kwargs: Any
) -> List[str]:
"""Add documents to the store."""
texts = [doc.page_content for doc in documents]
vectors = await self.embedding.aembed_documents(texts)
if ids and len(ids) != len(texts):
raise ValueError(
f"ids must be the same length as texts. "
f"Got {len(ids)} ids and {len(texts)} texts."
)
id_iterator: Iterator[Optional[str]] = (
iter(ids) if ids else iter(doc.id for doc in documents)
)
ids_: List[str] = []
for doc, vector in zip(documents, vectors):
doc_id = next(id_iterator)
doc_id_ = doc_id if doc_id else str(uuid.uuid4())
ids_.append(doc_id_)
self.store[doc_id_] = {
"id": doc_id_,
"vector": vector,
"text": doc.page_content,
"metadata": doc.metadata,
}
return ids_
[docs] def get_by_ids(self, ids: Sequence[str], /) -> List[Document]:
"""Get documents by their ids.
Args:
ids: The ids of the documents to get.
Returns:
A list of Document objects.
"""
documents = []
for doc_id in ids:
doc = self.store.get(doc_id)
if doc:
documents.append(
Document(
id=doc["id"],
page_content=doc["text"],
metadata=doc["metadata"],
)
)
return documents
[docs] @deprecated(
alternative="VectorStore.add_documents",
message=(
"This was a beta API that was added in 0.2.11. "
"It'll be removed in 0.3.0."
),
since="0.2.29",
removal="1.0",
)
def upsert(self, items: Sequence[Document], /, **kwargs: Any) -> UpsertResponse:
vectors = self.embedding.embed_documents([item.page_content for item in items])
ids = []
for item, vector in zip(items, vectors):
doc_id = item.id if item.id else str(uuid.uuid4())
ids.append(doc_id)
self.store[doc_id] = {
"id": doc_id,
"vector": vector,
"text": item.page_content,
"metadata": item.metadata,
}
return {
"succeeded": ids,
"failed": [],
}
[docs] @deprecated(
alternative="VectorStore.aadd_documents",
message=(
"This was a beta API that was added in 0.2.11. "
"It'll be removed in 0.3.0."
),
since="0.2.29",
removal="1.0",
)
async def aupsert(
self, items: Sequence[Document], /, **kwargs: Any
) -> UpsertResponse:
vectors = await self.embedding.aembed_documents(
[item.page_content for item in items]
)
ids = []
for item, vector in zip(items, vectors):
doc_id = item.id if item.id else str(uuid.uuid4())
ids.append(doc_id)
self.store[doc_id] = {
"id": doc_id,
"vector": vector,
"text": item.page_content,
"metadata": item.metadata,
}
return {
"succeeded": ids,
"failed": [],
}
[docs] async def aget_by_ids(self, ids: Sequence[str], /) -> List[Document]:
"""Async get documents by their ids.
Args:
ids: The ids of the documents to get.
Returns:
A list of Document objects.
"""
return self.get_by_ids(ids)
def _similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Callable[[Document], bool]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float, List[float]]]:
result = []
for doc in self.store.values():
vector = doc["vector"]
similarity = float(cosine_similarity([embedding], [vector]).item(0))
result.append(
(
Document(
id=doc["id"], page_content=doc["text"], metadata=doc["metadata"]
),
similarity,
vector,
)
)
result.sort(key=lambda x: x[1], reverse=True)
if filter is not None:
result = [r for r in result if filter(r[0])]
return result[:k]
[docs] def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Callable[[Document], bool]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
return [
(doc, similarity)
for doc, similarity, _ in self._similarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter, **kwargs
)
]
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
embedding = self.embedding.embed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding,
k,
**kwargs,
)
return docs
[docs] async def asimilarity_search_with_score(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
embedding = await self.embedding.aembed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding,
k,
**kwargs,
)
return docs
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
**kwargs: Any,
) -> List[Document]:
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding,
k,
**kwargs,
)
return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search_by_vector(
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Document]:
return self.similarity_search_by_vector(embedding, k, **kwargs)
[docs] def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
return [doc for doc, _ in self.similarity_search_with_score(query, k, **kwargs)]
[docs] async def asimilarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
return [
doc
for doc, _ in await self.asimilarity_search_with_score(query, k, **kwargs)
]
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
prefetch_hits = self._similarity_search_with_score_by_vector(
embedding=embedding,
k=fetch_k,
**kwargs,
)
try:
import numpy as np
except ImportError as e:
raise ImportError(
"numpy must be installed to use max_marginal_relevance_search "
"pip install numpy"
) from e
mmr_chosen_indices = maximal_marginal_relevance(
np.array(embedding, dtype=np.float32),
[vector for _, _, vector in prefetch_hits],
k=k,
lambda_mult=lambda_mult,
)
return [prefetch_hits[idx][0] for idx in mmr_chosen_indices]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
embedding_vector = self.embedding.embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding_vector,
k,
fetch_k,
lambda_mult=lambda_mult,
**kwargs,
)
[docs] async def amax_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
embedding_vector = await self.embedding.aembed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding_vector,
k,
fetch_k,
lambda_mult=lambda_mult,
**kwargs,
)
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> InMemoryVectorStore:
store = cls(
embedding=embedding,
)
store.add_texts(texts=texts, metadatas=metadatas, **kwargs)
return store
[docs] @classmethod
async def afrom_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> InMemoryVectorStore:
store = cls(
embedding=embedding,
)
await store.aadd_texts(texts=texts, metadatas=metadatas, **kwargs)
return store
[docs] @classmethod
def load(
cls, path: str, embedding: Embeddings, **kwargs: Any
) -> InMemoryVectorStore:
"""Load a vector store from a file.
Args:
path: The path to load the vector store from.
embedding: The embedding to use.
kwargs: Additional arguments to pass to the constructor.
Returns:
A VectorStore object.
"""
_path: Path = Path(path)
with _path.open("r") as f:
store = load(json.load(f))
vectorstore = cls(embedding=embedding, **kwargs)
vectorstore.store = store
return vectorstore
[docs] def dump(self, path: str) -> None:
"""Dump the vector store to a file.
Args:
path: The path to dump the vector store to.
"""
_path: Path = Path(path)
_path.parent.mkdir(exist_ok=True, parents=True)
with _path.open("w") as f:
json.dump(dumpd(self.store), f, indent=2)