from __future__ import annotations
import logging
import operator
import os
import pickle
import uuid
import warnings
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Sequence,
Sized,
Tuple,
Union,
)
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.vectorstores import VectorStore
from langchain_community.docstore.base import AddableMixin, Docstore
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores.utils import (
DistanceStrategy,
maximal_marginal_relevance,
)
logger = logging.getLogger(__name__)
[docs]
def dependable_faiss_import(no_avx2: Optional[bool] = None) -> Any:
"""
Import faiss if available, otherwise raise error.
If FAISS_NO_AVX2 environment variable is set, it will be considered
to load FAISS with no AVX2 optimization.
Args:
no_avx2: Load FAISS strictly with no AVX2 optimization
so that the vectorstore is portable and compatible with other devices.
"""
if no_avx2 is None and "FAISS_NO_AVX2" in os.environ:
no_avx2 = bool(os.getenv("FAISS_NO_AVX2"))
try:
if no_avx2:
from faiss import swigfaiss as faiss
else:
import faiss
except ImportError:
raise ImportError(
"Could not import faiss python package. "
"Please install it with `pip install faiss-gpu` (for CUDA supported GPU) "
"or `pip install faiss-cpu` (depending on Python version)."
)
return faiss
def _len_check_if_sized(x: Any, y: Any, x_name: str, y_name: str) -> None:
if isinstance(x, Sized) and isinstance(y, Sized) and len(x) != len(y):
raise ValueError(
f"{x_name} and {y_name} expected to be equal length but "
f"len({x_name})={len(x)} and len({y_name})={len(y)}"
)
return
[docs]
class FAISS(VectorStore):
"""FAISS vector store integration.
See [The FAISS Library](https://arxiv.org/pdf/2401.08281) paper.
Setup:
Install ``langchain_community`` and ``faiss-cpu`` python packages.
.. code-block:: bash
pip install -qU langchain_community faiss-cpu
Key init args — indexing params:
embedding_function: Embeddings
Embedding function to use.
Key init args — client params:
index: Any
FAISS index to use.
docstore: Docstore
Docstore to use.
index_to_docstore_id: Dict[int, str]
Mapping of index to docstore id.
Instantiate:
.. code-block:: python
import faiss
from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_openai import OpenAIEmbeddings
index = faiss.IndexFlatL2(len(OpenAIEmbeddings().embed_query("hello world")))
vector_store = FAISS(
embedding_function=OpenAIEmbeddings(),
index=index,
docstore= InMemoryDocstore(),
index_to_docstore_id={}
)
Add Documents:
.. code-block:: python
from langchain_core.documents import Document
document_1 = Document(page_content="foo", metadata={"baz": "bar"})
document_2 = Document(page_content="thud", metadata={"bar": "baz"})
document_3 = Document(page_content="i will be deleted :(")
documents = [document_1, document_2, document_3]
ids = ["1", "2", "3"]
vector_store.add_documents(documents=documents, ids=ids)
Delete Documents:
.. code-block:: python
vector_store.delete(ids=["3"])
Search:
.. code-block:: python
results = vector_store.similarity_search(query="thud",k=1)
for doc in results:
print(f"* {doc.page_content} [{doc.metadata}]")
.. code-block:: python
* thud [{'bar': 'baz'}]
Search with filter:
.. code-block:: python
results = vector_store.similarity_search(query="thud",k=1,filter={"bar": "baz"})
for doc in results:
print(f"* {doc.page_content} [{doc.metadata}]")
.. code-block:: python
* thud [{'bar': 'baz'}]
Search with score:
.. code-block:: python
results = vector_store.similarity_search_with_score(query="qux",k=1)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
.. code-block:: python
* [SIM=0.335304] foo [{'baz': 'bar'}]
Async:
.. code-block:: python
# add documents
# await vector_store.aadd_documents(documents=documents, ids=ids)
# delete documents
# await vector_store.adelete(ids=["3"])
# search
# results = vector_store.asimilarity_search(query="thud",k=1)
# search with score
results = await vector_store.asimilarity_search_with_score(query="qux",k=1)
for doc,score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
.. code-block:: python
* [SIM=0.335304] foo [{'baz': 'bar'}]
Use as Retriever:
.. code-block:: python
retriever = vector_store.as_retriever(
search_type="mmr",
search_kwargs={"k": 1, "fetch_k": 2, "lambda_mult": 0.5},
)
retriever.invoke("thud")
.. code-block:: python
[Document(metadata={'bar': 'baz'}, page_content='thud')]
""" # noqa: E501
[docs]
def __init__(
self,
embedding_function: Union[
Callable[[str], List[float]],
Embeddings,
],
index: Any,
docstore: Docstore,
index_to_docstore_id: Dict[int, str],
relevance_score_fn: Optional[Callable[[float], float]] = None,
normalize_L2: bool = False,
distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE,
):
"""Initialize with necessary components."""
if not isinstance(embedding_function, Embeddings):
logger.warning(
"`embedding_function` is expected to be an Embeddings object, support "
"for passing in a function will soon be removed."
)
self.embedding_function = embedding_function
self.index = index
self.docstore = docstore
self.index_to_docstore_id = index_to_docstore_id
self.distance_strategy = distance_strategy
self.override_relevance_score_fn = relevance_score_fn
self._normalize_L2 = normalize_L2
if (
self.distance_strategy != DistanceStrategy.EUCLIDEAN_DISTANCE
and self._normalize_L2
):
warnings.warn(
"Normalizing L2 is not applicable for "
f"metric type: {self.distance_strategy}"
)
@property
def embeddings(self) -> Optional[Embeddings]:
return (
self.embedding_function
if isinstance(self.embedding_function, Embeddings)
else None
)
def _embed_documents(self, texts: List[str]) -> List[List[float]]:
if isinstance(self.embedding_function, Embeddings):
return self.embedding_function.embed_documents(texts)
else:
return [self.embedding_function(text) for text in texts]
async def _aembed_documents(self, texts: List[str]) -> List[List[float]]:
if isinstance(self.embedding_function, Embeddings):
return await self.embedding_function.aembed_documents(texts)
else:
# return await asyncio.gather(
# [self.embedding_function(text) for text in texts]
# )
raise Exception(
"`embedding_function` is expected to be an Embeddings object, support "
"for passing in a function will soon be removed."
)
def _embed_query(self, text: str) -> List[float]:
if isinstance(self.embedding_function, Embeddings):
return self.embedding_function.embed_query(text)
else:
return self.embedding_function(text)
async def _aembed_query(self, text: str) -> List[float]:
if isinstance(self.embedding_function, Embeddings):
return await self.embedding_function.aembed_query(text)
else:
# return await self.embedding_function(text)
raise Exception(
"`embedding_function` is expected to be an Embeddings object, support "
"for passing in a function will soon be removed."
)
def __add(
self,
texts: Iterable[str],
embeddings: Iterable[List[float]],
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
) -> List[str]:
faiss = dependable_faiss_import()
if not isinstance(self.docstore, AddableMixin):
raise ValueError(
"If trying to add texts, the underlying docstore should support "
f"adding items, which {self.docstore} does not"
)
_len_check_if_sized(texts, metadatas, "texts", "metadatas")
ids = ids or [str(uuid.uuid4()) for _ in texts]
_len_check_if_sized(texts, ids, "texts", "ids")
_metadatas = metadatas or ({} for _ in texts)
documents = [
Document(id=id_, page_content=t, metadata=m)
for id_, t, m in zip(ids, texts, _metadatas)
]
_len_check_if_sized(documents, embeddings, "documents", "embeddings")
if ids and len(ids) != len(set(ids)):
raise ValueError("Duplicate ids found in the ids list.")
# Add to the index.
vector = np.array(embeddings, dtype=np.float32)
if self._normalize_L2:
faiss.normalize_L2(vector)
self.index.add(vector)
# Add information to docstore and index.
self.docstore.add({id_: doc for id_, doc in zip(ids, documents)})
starting_len = len(self.index_to_docstore_id)
index_to_id = {starting_len + j: id_ for j, id_ in enumerate(ids)}
self.index_to_docstore_id.update(index_to_id)
return ids
[docs]
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of unique IDs.
Returns:
List of ids from adding the texts into the vectorstore.
"""
texts = list(texts)
embeddings = self._embed_documents(texts)
return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs]
async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore
asynchronously.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of unique IDs.
Returns:
List of ids from adding the texts into the vectorstore.
"""
texts = list(texts)
embeddings = await self._aembed_documents(texts)
return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs]
def add_embeddings(
self,
text_embeddings: Iterable[Tuple[str, List[float]]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Add the given texts and embeddings to the vectorstore.
Args:
text_embeddings: Iterable pairs of string and embedding to
add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of unique IDs.
Returns:
List of ids from adding the texts into the vectorstore.
"""
# Embed and create the documents.
texts, embeddings = zip(*text_embeddings)
return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs]
def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query.
Args:
embedding: Embedding vector to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter (Optional[Union[Callable, Dict[str, Any]]]): Filter by metadata.
Defaults to None. If a callable, it must take as input the
metadata dict of Document and return a bool.
fetch_k: (Optional[int]) Number of Documents to fetch before filtering.
Defaults to 20.
**kwargs: kwargs to be passed to similarity search. Can include:
score_threshold: Optional, a floating point value between 0 to 1 to
filter the resulting set of retrieved docs
Returns:
List of documents most similar to the query text and L2 distance
in float for each. Lower score represents more similarity.
"""
faiss = dependable_faiss_import()
vector = np.array([embedding], dtype=np.float32)
if self._normalize_L2:
faiss.normalize_L2(vector)
scores, indices = self.index.search(vector, k if filter is None else fetch_k)
docs = []
if filter is not None:
filter_func = self._create_filter_func(filter)
for j, i in enumerate(indices[0]):
if i == -1:
# This happens when not enough docs are returned.
continue
_id = self.index_to_docstore_id[i]
doc = self.docstore.search(_id)
if not isinstance(doc, Document):
raise ValueError(f"Could not find document for id {_id}, got {doc}")
if filter is not None:
if filter_func(doc.metadata):
docs.append((doc, scores[0][j]))
else:
docs.append((doc, scores[0][j]))
score_threshold = kwargs.get("score_threshold")
if score_threshold is not None:
cmp = (
operator.ge
if self.distance_strategy
in (DistanceStrategy.MAX_INNER_PRODUCT, DistanceStrategy.JACCARD)
else operator.le
)
docs = [
(doc, similarity)
for doc, similarity in docs
if cmp(similarity, score_threshold)
]
return docs[:k]
[docs]
async def asimilarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query asynchronously.
Args:
embedding: Embedding vector to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, Any]]): Filter by metadata.
Defaults to None. If a callable, it must take as input the
metadata dict of Document and return a bool.
fetch_k: (Optional[int]) Number of Documents to fetch before filtering.
Defaults to 20.
**kwargs: kwargs to be passed to similarity search. Can include:
score_threshold: Optional, a floating point value between 0 to 1 to
filter the resulting set of retrieved docs
Returns:
List of documents most similar to the query text and L2 distance
in float for each. Lower score represents more similarity.
"""
# This is a temporary workaround to make the similarity search asynchronous.
return await run_in_executor(
None,
self.similarity_search_with_score_by_vector,
embedding,
k=k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
[docs]
def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata.
Defaults to None. If a callable, it must take as input the
metadata dict of Document and return a bool.
fetch_k: (Optional[int]) Number of Documents to fetch before filtering.
Defaults to 20.
Returns:
List of documents most similar to the query text with
L2 distance in float. Lower score represents more similarity.
"""
embedding = self._embed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return docs
[docs]
async def asimilarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query asynchronously.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata.
Defaults to None. If a callable, it must take as input the
metadata dict of Document and return a bool.
fetch_k: (Optional[int]) Number of Documents to fetch before filtering.
Defaults to 20.
Returns:
List of documents most similar to the query text with
L2 distance in float. Lower score represents more similarity.
"""
embedding = await self._aembed_query(query)
docs = await self.asimilarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return docs
[docs]
def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to embedding vector.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata.
Defaults to None. If a callable, it must take as input the
metadata dict of Document and return a bool.
fetch_k: (Optional[int]) Number of Documents to fetch before filtering.
Defaults to 20.
Returns:
List of Documents most similar to the embedding.
"""
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return [doc for doc, _ in docs_and_scores]
[docs]
async def asimilarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to embedding vector asynchronously.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata.
Defaults to None. If a callable, it must take as input the
metadata dict of Document and return a bool.
fetch_k: (Optional[int]) Number of Documents to fetch before filtering.
Defaults to 20.
Returns:
List of Documents most similar to the embedding.
"""
docs_and_scores = await self.asimilarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return [doc for doc, _ in docs_and_scores]
[docs]
def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
fetch_k: (Optional[int]) Number of Documents to fetch before filtering.
Defaults to 20.
Returns:
List of Documents most similar to the query.
"""
docs_and_scores = self.similarity_search_with_score(
query, k, filter=filter, fetch_k=fetch_k, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs]
async def asimilarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to query asynchronously.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
fetch_k: (Optional[int]) Number of Documents to fetch before filtering.
Defaults to 20.
Returns:
List of Documents most similar to the query.
"""
docs_and_scores = await self.asimilarity_search_with_score(
query, k, filter=filter, fetch_k=fetch_k, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs]
def max_marginal_relevance_search_with_score_by_vector(
self,
embedding: List[float],
*,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
) -> List[Tuple[Document, float]]:
"""Return docs and their similarity scores selected using the maximal marginal
relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch before filtering to
pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
Returns:
List of Documents and similarity scores selected by maximal marginal
relevance and score for each.
"""
scores, indices = self.index.search(
np.array([embedding], dtype=np.float32),
fetch_k if filter is None else fetch_k * 2,
)
if filter is not None:
filter_func = self._create_filter_func(filter)
filtered_indices = []
for i in indices[0]:
if i == -1:
# This happens when not enough docs are returned.
continue
_id = self.index_to_docstore_id[i]
doc = self.docstore.search(_id)
if not isinstance(doc, Document):
raise ValueError(f"Could not find document for id {_id}, got {doc}")
if filter_func(doc.metadata):
filtered_indices.append(i)
indices = np.array([filtered_indices])
# -1 happens when not enough docs are returned.
embeddings = [self.index.reconstruct(int(i)) for i in indices[0] if i != -1]
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
embeddings,
k=k,
lambda_mult=lambda_mult,
)
docs_and_scores = []
for i in mmr_selected:
if indices[0][i] == -1:
# This happens when not enough docs are returned.
continue
_id = self.index_to_docstore_id[indices[0][i]]
doc = self.docstore.search(_id)
if not isinstance(doc, Document):
raise ValueError(f"Could not find document for id {_id}, got {doc}")
docs_and_scores.append((doc, scores[0][i]))
return docs_and_scores
[docs]
async def amax_marginal_relevance_search_with_score_by_vector(
self,
embedding: List[float],
*,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
) -> List[Tuple[Document, float]]:
"""Return docs and their similarity scores selected using the maximal marginal
relevance asynchronously.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch before filtering to
pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
Returns:
List of Documents and similarity scores selected by maximal marginal
relevance and score for each.
"""
# This is a temporary workaround to make the similarity search asynchronous.
return await run_in_executor(
None,
self.max_marginal_relevance_search_with_score_by_vector,
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
)
[docs]
def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch before filtering to
pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
Returns:
List of Documents selected by maximal marginal relevance.
"""
docs_and_scores = self.max_marginal_relevance_search_with_score_by_vector(
embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter
)
return [doc for doc, _ in docs_and_scores]
[docs]
async def amax_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance asynchronously.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch before filtering to
pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
Returns:
List of Documents selected by maximal marginal relevance.
"""
docs_and_scores = (
await self.amax_marginal_relevance_search_with_score_by_vector(
embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter
)
)
return [doc for doc, _ in docs_and_scores]
[docs]
def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch before filtering (if needed) to
pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
Returns:
List of Documents selected by maximal marginal relevance.
"""
embedding = self._embed_query(query)
docs = self.max_marginal_relevance_search_by_vector(
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
**kwargs,
)
return docs
[docs]
async def amax_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance asynchronously.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch before filtering (if needed) to
pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
Returns:
List of Documents selected by maximal marginal relevance.
"""
embedding = await self._aembed_query(query)
docs = await self.amax_marginal_relevance_search_by_vector(
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
**kwargs,
)
return docs
[docs]
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""Delete by ID. These are the IDs in the vectorstore.
Args:
ids: List of ids to delete.
Returns:
Optional[bool]: True if deletion is successful,
False otherwise, None if not implemented.
"""
if ids is None:
raise ValueError("No ids provided to delete.")
missing_ids = set(ids).difference(self.index_to_docstore_id.values())
if missing_ids:
raise ValueError(
f"Some specified ids do not exist in the current store. Ids not found: "
f"{missing_ids}"
)
reversed_index = {id_: idx for idx, id_ in self.index_to_docstore_id.items()}
index_to_delete = {reversed_index[id_] for id_ in ids}
self.index.remove_ids(np.fromiter(index_to_delete, dtype=np.int64))
self.docstore.delete(ids)
remaining_ids = [
id_
for i, id_ in sorted(self.index_to_docstore_id.items())
if i not in index_to_delete
]
self.index_to_docstore_id = {i: id_ for i, id_ in enumerate(remaining_ids)}
return True
[docs]
def merge_from(self, target: FAISS) -> None:
"""Merge another FAISS object with the current one.
Add the target FAISS to the current one.
Args:
target: FAISS object you wish to merge into the current one
Returns:
None.
"""
if not isinstance(self.docstore, AddableMixin):
raise ValueError("Cannot merge with this type of docstore")
# Numerical index for target docs are incremental on existing ones
starting_len = len(self.index_to_docstore_id)
# Merge two IndexFlatL2
self.index.merge_from(target.index)
# Get id and docs from target FAISS object
full_info = []
for i, target_id in target.index_to_docstore_id.items():
doc = target.docstore.search(target_id)
if not isinstance(doc, Document):
raise ValueError("Document should be returned")
full_info.append((starting_len + i, target_id, doc))
# Add information to docstore and index_to_docstore_id.
self.docstore.add({_id: doc for _, _id, doc in full_info})
index_to_id = {index: _id for index, _id, _ in full_info}
self.index_to_docstore_id.update(index_to_id)
@classmethod
def __from(
cls,
texts: Iterable[str],
embeddings: List[List[float]],
embedding: Embeddings,
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
normalize_L2: bool = False,
distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE,
**kwargs: Any,
) -> FAISS:
faiss = dependable_faiss_import()
if distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = faiss.IndexFlatIP(len(embeddings[0]))
else:
# Default to L2, currently other metric types not initialized.
index = faiss.IndexFlatL2(len(embeddings[0]))
docstore = kwargs.pop("docstore", InMemoryDocstore())
index_to_docstore_id = kwargs.pop("index_to_docstore_id", {})
vecstore = cls(
embedding,
index,
docstore,
index_to_docstore_id,
normalize_L2=normalize_L2,
distance_strategy=distance_strategy,
**kwargs,
)
vecstore.__add(texts, embeddings, metadatas=metadatas, ids=ids)
return vecstore
[docs]
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""Construct FAISS wrapper from raw documents.
This is a user friendly interface that:
1. Embeds documents.
2. Creates an in memory docstore
3. Initializes the FAISS database
This is intended to be a quick way to get started.
Example:
.. code-block:: python
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
faiss = FAISS.from_texts(texts, embeddings)
"""
embeddings = embedding.embed_documents(texts)
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs]
@classmethod
async def afrom_texts(
cls,
texts: list[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""Construct FAISS wrapper from raw documents asynchronously.
This is a user friendly interface that:
1. Embeds documents.
2. Creates an in memory docstore
3. Initializes the FAISS database
This is intended to be a quick way to get started.
Example:
.. code-block:: python
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
faiss = await FAISS.afrom_texts(texts, embeddings)
"""
embeddings = await embedding.aembed_documents(texts)
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs]
@classmethod
def from_embeddings(
cls,
text_embeddings: Iterable[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""Construct FAISS wrapper from raw documents.
This is a user friendly interface that:
1. Embeds documents.
2. Creates an in memory docstore
3. Initializes the FAISS database
This is intended to be a quick way to get started.
Example:
.. code-block:: python
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
text_embeddings = embeddings.embed_documents(texts)
text_embedding_pairs = zip(texts, text_embeddings)
faiss = FAISS.from_embeddings(text_embedding_pairs, embeddings)
"""
texts, embeddings = zip(*text_embeddings)
return cls.__from(
list(texts),
list(embeddings),
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs]
@classmethod
async def afrom_embeddings(
cls,
text_embeddings: Iterable[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""Construct FAISS wrapper from raw documents asynchronously."""
return cls.from_embeddings(
text_embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs]
def save_local(self, folder_path: str, index_name: str = "index") -> None:
"""Save FAISS index, docstore, and index_to_docstore_id to disk.
Args:
folder_path: folder path to save index, docstore,
and index_to_docstore_id to.
index_name: for saving with a specific index file name
"""
path = Path(folder_path)
path.mkdir(exist_ok=True, parents=True)
# save index separately since it is not picklable
faiss = dependable_faiss_import()
faiss.write_index(self.index, str(path / f"{index_name}.faiss"))
# save docstore and index_to_docstore_id
with open(path / f"{index_name}.pkl", "wb") as f:
pickle.dump((self.docstore, self.index_to_docstore_id), f)
[docs]
@classmethod
def load_local(
cls,
folder_path: str,
embeddings: Embeddings,
index_name: str = "index",
*,
allow_dangerous_deserialization: bool = False,
**kwargs: Any,
) -> FAISS:
"""Load FAISS index, docstore, and index_to_docstore_id from disk.
Args:
folder_path: folder path to load index, docstore,
and index_to_docstore_id from.
embeddings: Embeddings to use when generating queries
index_name: for saving with a specific index file name
allow_dangerous_deserialization: whether to allow deserialization
of the data which involves loading a pickle file.
Pickle files can be modified by malicious actors to deliver a
malicious payload that results in execution of
arbitrary code on your machine.
"""
if not allow_dangerous_deserialization:
raise ValueError(
"The de-serialization relies loading a pickle file. "
"Pickle files can be modified to deliver a malicious payload that "
"results in execution of arbitrary code on your machine."
"You will need to set `allow_dangerous_deserialization` to `True` to "
"enable deserialization. If you do this, make sure that you "
"trust the source of the data. For example, if you are loading a "
"file that you created, and know that no one else has modified the "
"file, then this is safe to do. Do not set this to `True` if you are "
"loading a file from an untrusted source (e.g., some random site on "
"the internet.)."
)
path = Path(folder_path)
# load index separately since it is not picklable
faiss = dependable_faiss_import()
index = faiss.read_index(str(path / f"{index_name}.faiss"))
# load docstore and index_to_docstore_id
with open(path / f"{index_name}.pkl", "rb") as f:
(
docstore,
index_to_docstore_id,
) = pickle.load( # ignore[pickle]: explicit-opt-in
f
)
return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs)
[docs]
def serialize_to_bytes(self) -> bytes:
"""Serialize FAISS index, docstore, and index_to_docstore_id to bytes."""
return pickle.dumps((self.index, self.docstore, self.index_to_docstore_id))
[docs]
@classmethod
def deserialize_from_bytes(
cls,
serialized: bytes,
embeddings: Embeddings,
*,
allow_dangerous_deserialization: bool = False,
**kwargs: Any,
) -> FAISS:
"""Deserialize FAISS index, docstore, and index_to_docstore_id from bytes."""
if not allow_dangerous_deserialization:
raise ValueError(
"The de-serialization relies loading a pickle file. "
"Pickle files can be modified to deliver a malicious payload that "
"results in execution of arbitrary code on your machine."
"You will need to set `allow_dangerous_deserialization` to `True` to "
"enable deserialization. If you do this, make sure that you "
"trust the source of the data. For example, if you are loading a "
"file that you created, and know that no one else has modified the "
"file, then this is safe to do. Do not set this to `True` if you are "
"loading a file from an untrusted source (e.g., some random site on "
"the internet.)."
)
(
index,
docstore,
index_to_docstore_id,
) = pickle.loads( # ignore[pickle]: explicit-opt-in
serialized
)
return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs)
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""
The 'correct' relevance function
may differ depending on a few things, including:
- the distance / similarity metric used by the VectorStore
- the scale of your embeddings (OpenAI's are unit normed. Many others are not!)
- embedding dimensionality
- etc.
"""
if self.override_relevance_score_fn is not None:
return self.override_relevance_score_fn
# Default strategy is to rely on distance strategy provided in
# vectorstore constructor
if self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
return self._max_inner_product_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
# Default behavior is to use euclidean distance relevancy
return self._euclidean_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.COSINE:
return self._cosine_relevance_score_fn
else:
raise ValueError(
"Unknown distance strategy, must be cosine, max_inner_product,"
" or euclidean"
)
def _similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs and their similarity scores on a scale from 0 to 1."""
# Pop score threshold so that only relevancy scores, not raw scores, are
# filtered.
relevance_score_fn = self._select_relevance_score_fn()
if relevance_score_fn is None:
raise ValueError(
"relevance_score_fn must be provided to"
" FAISS constructor to normalize scores"
)
docs_and_scores = self.similarity_search_with_score(
query,
k=k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
docs_and_rel_scores = [
(doc, relevance_score_fn(score)) for doc, score in docs_and_scores
]
return docs_and_rel_scores
async def _asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs and their similarity scores on a scale from 0 to 1."""
# Pop score threshold so that only relevancy scores, not raw scores, are
# filtered.
relevance_score_fn = self._select_relevance_score_fn()
if relevance_score_fn is None:
raise ValueError(
"relevance_score_fn must be provided to"
" FAISS constructor to normalize scores"
)
docs_and_scores = await self.asimilarity_search_with_score(
query,
k=k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
docs_and_rel_scores = [
(doc, relevance_score_fn(score)) for doc, score in docs_and_scores
]
return docs_and_rel_scores
@staticmethod
def _create_filter_func(
filter: Optional[Union[Callable, Dict[str, Any]]],
) -> Callable[[Dict[str, Any]], bool]:
"""
Create a filter function based on the provided filter.
Args:
filter: A callable or a dictionary representing the filter
conditions for documents.
Returns:
A function that takes Document's metadata and returns True if it
satisfies the filter conditions, otherwise False.
Raises:
ValueError: If the filter is invalid or contains unsuported operators.
"""
if callable(filter):
return filter
if not isinstance(filter, dict):
raise ValueError(
f"filter must be a dict of metadata or a callable, not {type(filter)}"
)
from operator import eq, ge, gt, le, lt, ne
COMPARISON_OPERATORS = {
"$eq": eq,
"$neq": ne,
"$gt": gt,
"$lt": lt,
"$gte": ge,
"$lte": le,
}
SEQUENCE_OPERATORS = {
"$in": lambda a, b: a in b,
"$nin": lambda a, b: a not in b,
}
OPERATIONS = COMPARISON_OPERATORS | SEQUENCE_OPERATORS
VALID_OPERATORS = frozenset(list(OPERATIONS) + ["$and", "$or", "$not"])
SET_CONVERT_THRESHOLD = 10
# Validate top-level filter operators.
for op in filter:
if op and op.startswith("$") and op not in VALID_OPERATORS:
raise ValueError(f"filter contains unsupported operator: {op}")
def filter_func_cond(
field: str, condition: Union[Dict[str, Any], List[Any], Any]
) -> Callable[[Dict[str, Any]], bool]:
"""
Creates a filter function based on field and condition.
Args:
field: The document field to filter on
condition: Filter condition (dict for operators, list for in,
or direct value for equality)
Returns:
A filter function that takes a document and returns boolean
"""
if isinstance(condition, dict):
operators = []
for op, value in condition.items():
if op not in OPERATIONS:
raise ValueError(f"filter contains unsupported operator: {op}")
operators.append((OPERATIONS[op], value))
def filter_fn(doc: Dict[str, Any]) -> bool:
"""
Evaluates a document against a set of predefined operators
and their values. This function applies multiple
comparison/sequence operators to a specific field value
from the document. All conditions must be satisfied for the
function to return True.
Args:
doc (Dict[str, Any]): The document to evaluate, containing
key-value pairs where keys are field names and values
are the field values. The document must contain the field
being filtered.
Returns:
bool: True if the document's field value satisfies all
operator conditions, False otherwise.
"""
doc_value = doc.get(field)
return all(op(doc_value, value) for op, value in operators)
return filter_fn
if isinstance(condition, list):
if len(condition) > SET_CONVERT_THRESHOLD:
condition_set = frozenset(condition)
return lambda doc: doc.get(field) in condition_set
return lambda doc: doc.get(field) in condition
return lambda doc: doc.get(field) == condition
def filter_func(filter: Dict[str, Any]) -> Callable[[Dict[str, Any]], bool]:
"""
Creates a filter function that evaluates documents against specified
filter conditions.
This function processes a dictionary of filter conditions and returns
a callable that can evaluate documents against these conditions. It
supports logical operators ($and, $or, $not) and field-level filtering.
Args:
filter (Dict[str, Any]): A dictionary containing filter conditions.
Can include:
- Logical operators ($and, $or, $not) with lists of sub-filters
- Field-level conditions with comparison or sequence operators
- Direct field-value mappings for equality comparison
Returns:
Callable[[Dict[str, Any]], bool]: A function that takes a document
(as a dictionary) and returns True if the document matches all
filter conditions, False otherwise.
"""
if "$and" in filter:
filters = [filter_func(sub_filter) for sub_filter in filter["$and"]]
return lambda doc: all(f(doc) for f in filters)
if "$or" in filter:
filters = [filter_func(sub_filter) for sub_filter in filter["$or"]]
return lambda doc: any(f(doc) for f in filters)
if "$not" in filter:
cond = filter_func(filter["$not"])
return lambda doc: not cond(doc)
conditions = [
filter_func_cond(field, condition)
for field, condition in filter.items()
]
return lambda doc: all(condition(doc) for condition in conditions)
return filter_func(filter)
[docs]
def get_by_ids(self, ids: Sequence[str], /) -> list[Document]:
docs = [self.docstore.search(id_) for id_ in ids]
return [doc for doc in docs if isinstance(doc, Document)]