from __future__ import annotations
import json
import logging
import uuid
from hashlib import sha1
from typing import Any, Dict, Iterable, List, Optional, Type
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from pydantic_settings import BaseSettings, SettingsConfigDict
logger = logging.getLogger()
DEFAULT_K = 4 # Number of Documents to return.
[docs]
class ManticoreSearchSettings(BaseSettings):
proto: str = "http"
host: str = "localhost"
port: int = 9308
username: Optional[str] = None
password: Optional[str] = None
# database: str = "Manticore"
table: str = "langchain"
column_map: Dict[str, str] = {
"id": "id",
"uuid": "uuid",
"document": "document",
"embedding": "embedding",
"metadata": "metadata",
}
# A mandatory setting; currently, only hnsw is supported.
knn_type: str = "hnsw"
# A mandatory setting that specifies the dimensions of the vectors being indexed.
knn_dims: Optional[int] = None # Defaults autodetect
# A mandatory setting that specifies the distance function used by the HNSW index.
hnsw_similarity: str = "L2" # Acceptable values are: L2, IP, COSINE
# An optional setting that defines the maximum amount of outgoing connections
# in the graph.
hnsw_m: int = 16 # The default is 16.
# An optional setting that defines a construction time/accuracy trade-off.
hnsw_ef_construction: int = 100
[docs]
def get_connection_string(self) -> str:
return self.proto + "://" + self.host + ":" + str(self.port)
def __getitem__(self, item: str) -> Any:
return getattr(self, item)
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
env_prefix="manticore_",
extra="ignore",
)
[docs]
class ManticoreSearch(VectorStore):
"""
`ManticoreSearch Engine` vector store.
To use, you should have the ``manticoresearch`` python package installed.
Example:
.. code-block:: python
from langchain_community.vectorstores import Manticore
from langchain_community.embeddings.openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = ManticoreSearch(embeddings)
"""
[docs]
def __init__(
self,
embedding: Embeddings,
*,
config: Optional[ManticoreSearchSettings] = None,
**kwargs: Any,
) -> None:
"""
ManticoreSearch Wrapper to LangChain
Args:
embedding (Embeddings): Text embedding model.
config (ManticoreSearchSettings): Configuration of ManticoreSearch Client
**kwargs: Other keyword arguments will pass into Configuration of API client
manticoresearch-python. See
https://github.com/manticoresoftware/manticoresearch-python for more.
"""
try:
import manticoresearch.api as ENDPOINTS
import manticoresearch.api_client as API
except ImportError:
raise ImportError(
"Could not import manticoresearch python package. "
"Please install it with `pip install manticoresearch-dev`."
)
try:
from tqdm import tqdm
self.pgbar = tqdm
except ImportError:
# Just in case if tqdm is not installed
self.pgbar = lambda x, **kwargs: x
super().__init__()
self.embedding = embedding
if config is not None:
self.config = config
else:
self.config = ManticoreSearchSettings()
assert self.config
assert self.config.host and self.config.port
assert (
self.config.column_map
# and self.config.database
and self.config.table
)
assert (
self.config.knn_type
# and self.config.knn_dims
# and self.config.hnsw_m
# and self.config.hnsw_ef_construction
and self.config.hnsw_similarity
)
for k in ["id", "embedding", "document", "metadata", "uuid"]:
assert k in self.config.column_map
# Detect embeddings dimension
if self.config.knn_dims is None:
self.dim: int = len(self.embedding.embed_query("test"))
else:
self.dim = self.config.knn_dims
# Initialize the schema
self.schema = f"""\
CREATE TABLE IF NOT EXISTS {self.config.table}(
{self.config.column_map['id']} bigint,
{self.config.column_map['document']} text indexed stored,
{self.config.column_map['embedding']} \
float_vector knn_type='{self.config.knn_type}' \
knn_dims='{self.dim}' \
hnsw_similarity='{self.config.hnsw_similarity}' \
hnsw_m='{self.config.hnsw_m}' \
hnsw_ef_construction='{self.config.hnsw_ef_construction}',
{self.config.column_map['metadata']} json,
{self.config.column_map['uuid']} text indexed stored
)\
"""
# Create a connection to ManticoreSearch
self.configuration = API.Configuration(
host=self.config.get_connection_string(),
username=self.config.username,
password=self.config.password,
# disabled_client_side_validations=",",
**kwargs,
)
self.connection = API.ApiClient(self.configuration)
self.client = {
"index": ENDPOINTS.IndexApi(self.connection),
"utils": ENDPOINTS.UtilsApi(self.connection),
"search": ENDPOINTS.SearchApi(self.connection),
}
# Create default schema if not exists
self.client["utils"].sql(self.schema)
@property
def embeddings(self) -> Embeddings:
return self.embedding
[docs]
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
*,
batch_size: int = 32,
text_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""
Insert more texts through the embeddings and add to the VectorStore.
Args:
texts: Iterable of strings to add to the VectorStore
metadata: Optional column data to be inserted
batch_size: Batch size of insertion
ids: Optional list of ids to associate with the texts
Returns:
List of ids from adding the texts into the VectorStore.
"""
# Embed and create the documents
ids = text_ids or [
# See https://stackoverflow.com/questions/67219691/python-hash-function-that-returns-32-or-64-bits
str(int(sha1(t.encode("utf-8")).hexdigest()[:15], 16))
for t in texts
]
transac = []
for i, text in enumerate(texts):
embed = self.embeddings.embed_query(text)
doc_uuid = str(uuid.uuid1())
doc = {
self.config.column_map["document"]: text,
self.config.column_map["embedding"]: embed,
self.config.column_map["metadata"]: metadatas[i] if metadatas else {},
self.config.column_map["uuid"]: doc_uuid,
}
transac.append(
{"replace": {"index": self.config.table, "id": ids[i], "doc": doc}}
)
if len(transac) == batch_size:
body = "\n".join(map(json.dumps, transac))
try:
self.client["index"].bulk(body)
transac = []
except Exception as e:
logger.info(f"Error indexing documents: {e}")
if len(transac) > 0:
body = "\n".join(map(json.dumps, transac))
try:
self.client["index"].bulk(body)
except Exception as e:
logger.info(f"Error indexing documents: {e}")
return ids
[docs]
@classmethod
def from_texts(
cls: Type[ManticoreSearch],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[Dict[Any, Any]]] = None,
*,
config: Optional[ManticoreSearchSettings] = None,
text_ids: Optional[List[str]] = None,
batch_size: int = 32,
**kwargs: Any,
) -> ManticoreSearch:
ctx = cls(embedding, config=config, **kwargs)
ctx.add_texts(
texts=texts,
embedding=embedding,
text_ids=text_ids,
batch_size=batch_size,
metadatas=metadatas,
**kwargs,
)
return ctx
[docs]
@classmethod
def from_documents(
cls: Type[ManticoreSearch],
documents: List[Document],
embedding: Embeddings,
*,
config: Optional[ManticoreSearchSettings] = None,
text_ids: Optional[List[str]] = None,
batch_size: int = 32,
**kwargs: Any,
) -> ManticoreSearch:
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return cls.from_texts(
texts=texts,
embedding=embedding,
text_ids=text_ids,
batch_size=batch_size,
metadatas=metadatas,
**kwargs,
)
def __repr__(self) -> str:
"""
Text representation for ManticoreSearch Vector Store, prints backends, username
and schemas. Easy to use with `str(ManticoreSearch())`
Returns:
repr: string to show connection info and data schema
"""
_repr = f"\033[92m\033[1m{self.config.table} @ "
_repr += f"http://{self.config.host}:{self.config.port}\033[0m\n\n"
_repr += f"\033[1musername: {self.config.username}\033[0m\n\nTable Schema:\n"
_repr += "-" * 51 + "\n"
for r in self.client["utils"].sql(f"DESCRIBE {self.config.table}")[0]["data"]:
_repr += (
f"|\033[94m{r['Field']:24s}\033[0m|\033["
f"96m{r['Type'] + ' ' + r['Properties']:24s}\033[0m|\n"
)
_repr += "-" * 51 + "\n"
return _repr
[docs]
def similarity_search(
self, query: str, k: int = DEFAULT_K, **kwargs: Any
) -> List[Document]:
"""Perform a similarity search with ManticoreSearch
Args:
query (str): query string
k (int, optional): Top K neighbors to retrieve. Defaults to 4.
Returns:
List[Document]: List of Documents
"""
return self.similarity_search_by_vector(
self.embedding.embed_query(query), k, **kwargs
)
[docs]
def similarity_search_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_K,
**kwargs: Any,
) -> List[Document]:
"""Perform a similarity search with ManticoreSearch by vectors
Args:
embedding (List[float]): Embedding vector
k (int, optional): Top K neighbors to retrieve. Defaults to 4.
Returns:
List[Document]: List of documents
"""
# Build search request
request = {
"index": self.config.table,
"knn": {
"field": self.config.column_map["embedding"],
"k": k,
"query_vector": embedding,
},
}
# Execute request and convert response to langchain.Document format
try:
return [
Document(
page_content=r["_source"][self.config.column_map["document"]],
metadata=r["_source"][self.config.column_map["metadata"]],
)
for r in self.client["search"].search(request, **kwargs).hits.hits[:k]
]
except Exception as e:
logger.error(f"\033[91m\033[1m{type(e)}\033[0m \033[95m{str(e)}\033[0m")
return []
[docs]
def drop(self) -> None:
"""
Helper function: Drop data
"""
self.client["utils"].sql(f"DROP TABLE IF EXISTS {self.config.table}")
@property
def metadata_column(self) -> str:
return self.config.column_map["metadata"]