"""SAP HANA Cloud Vector Engine"""
from __future__ import annotations
import importlib.util
import json
import re
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Pattern,
Tuple,
Type,
)
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import (
DistanceStrategy,
maximal_marginal_relevance,
)
if TYPE_CHECKING:
from hdbcli import dbapi
HANA_DISTANCE_FUNCTION: dict = {
DistanceStrategy.COSINE: ("COSINE_SIMILARITY", "DESC"),
DistanceStrategy.EUCLIDEAN_DISTANCE: ("L2DISTANCE", "ASC"),
}
COMPARISONS_TO_SQL = {
"$eq": "=",
"$ne": "<>",
"$lt": "<",
"$lte": "<=",
"$gt": ">",
"$gte": ">=",
}
IN_OPERATORS_TO_SQL = {
"$in": "IN",
"$nin": "NOT IN",
}
BETWEEN_OPERATOR = "$between"
LIKE_OPERATOR = "$like"
LOGICAL_OPERATORS_TO_SQL = {"$and": "AND", "$or": "OR"}
default_distance_strategy = DistanceStrategy.COSINE
default_table_name: str = "EMBEDDINGS"
default_content_column: str = "VEC_TEXT"
default_metadata_column: str = "VEC_META"
default_vector_column: str = "VEC_VECTOR"
default_vector_column_length: int = -1 # -1 means dynamic length
[docs]
class HanaDB(VectorStore):
"""SAP HANA Cloud Vector Engine
The prerequisite for using this class is the installation of the ``hdbcli``
Python package.
The HanaDB vectorstore can be created by providing an embedding function and
an existing database connection. Optionally, the names of the table and the
columns to use.
"""
[docs]
def __init__(
self,
connection: dbapi.Connection,
embedding: Embeddings,
distance_strategy: DistanceStrategy = default_distance_strategy,
table_name: str = default_table_name,
content_column: str = default_content_column,
metadata_column: str = default_metadata_column,
vector_column: str = default_vector_column,
vector_column_length: int = default_vector_column_length,
*,
specific_metadata_columns: Optional[List[str]] = None,
):
# Check if the hdbcli package is installed
if importlib.util.find_spec("hdbcli") is None:
raise ImportError(
"Could not import hdbcli python package. "
"Please install it with `pip install hdbcli`."
)
valid_distance = False
for key in HANA_DISTANCE_FUNCTION.keys():
if key is distance_strategy:
valid_distance = True
if not valid_distance:
raise ValueError(
"Unsupported distance_strategy: {}".format(distance_strategy)
)
self.connection = connection
self.embedding = embedding
self.distance_strategy = distance_strategy
self.table_name = HanaDB._sanitize_name(table_name)
self.content_column = HanaDB._sanitize_name(content_column)
self.metadata_column = HanaDB._sanitize_name(metadata_column)
self.vector_column = HanaDB._sanitize_name(vector_column)
self.vector_column_length = HanaDB._sanitize_int(vector_column_length)
self.specific_metadata_columns = HanaDB._sanitize_specific_metadata_columns(
specific_metadata_columns or []
)
# Check if the table exists, and eventually create it
if not self._table_exists(self.table_name):
sql_str = (
f'CREATE TABLE "{self.table_name}"('
f'"{self.content_column}" NCLOB, '
f'"{self.metadata_column}" NCLOB, '
f'"{self.vector_column}" REAL_VECTOR '
)
if self.vector_column_length in [-1, 0]:
sql_str += ");"
else:
sql_str += f"({self.vector_column_length}));"
try:
cur = self.connection.cursor()
cur.execute(sql_str)
finally:
cur.close()
# Check if the needed columns exist and have the correct type
self._check_column(self.table_name, self.content_column, ["NCLOB", "NVARCHAR"])
self._check_column(self.table_name, self.metadata_column, ["NCLOB", "NVARCHAR"])
self._check_column(
self.table_name,
self.vector_column,
["REAL_VECTOR"],
self.vector_column_length,
)
for column_name in self.specific_metadata_columns:
self._check_column(self.table_name, column_name)
def _table_exists(self, table_name) -> bool: # type: ignore[no-untyped-def]
sql_str = (
"SELECT COUNT(*) FROM SYS.TABLES WHERE SCHEMA_NAME = CURRENT_SCHEMA"
" AND TABLE_NAME = ?"
)
try:
cur = self.connection.cursor()
cur.execute(sql_str, (table_name))
if cur.has_result_set():
rows = cur.fetchall()
if rows[0][0] == 1:
return True
finally:
cur.close()
return False
def _check_column( # type: ignore[no-untyped-def]
self, table_name, column_name, column_type=None, column_length=None
):
sql_str = (
"SELECT DATA_TYPE_NAME, LENGTH FROM SYS.TABLE_COLUMNS WHERE "
"SCHEMA_NAME = CURRENT_SCHEMA "
"AND TABLE_NAME = ? AND COLUMN_NAME = ?"
)
try:
cur = self.connection.cursor()
cur.execute(sql_str, (table_name, column_name))
if cur.has_result_set():
rows = cur.fetchall()
if len(rows) == 0:
raise AttributeError(f"Column {column_name} does not exist")
# Check data type
if column_type:
if rows[0][0] not in column_type:
raise AttributeError(
f"Column {column_name} has the wrong type: {rows[0][0]}"
)
# Check length, if parameter was provided
# Length can either be -1 (QRC01+02-24) or 0 (QRC03-24 onwards)
# to indicate no length constraint being present.
if column_length is not None and column_length > 0:
if rows[0][1] != column_length:
raise AttributeError(
f"Column {column_name} has the wrong length: {rows[0][1]} "
f"expected: {column_length}"
)
else:
raise AttributeError(f"Column {column_name} does not exist")
finally:
cur.close()
@property
def embeddings(self) -> Embeddings:
return self.embedding
@staticmethod
def _sanitize_name(input_str: str) -> str: # type: ignore[misc]
# Remove characters that are not alphanumeric or underscores
return re.sub(r"[^a-zA-Z0-9_]", "", input_str)
@staticmethod
def _sanitize_int(input_int: any) -> int: # type: ignore[valid-type]
value = int(str(input_int))
if value < -1:
raise ValueError(f"Value ({value}) must not be smaller than -1")
return int(str(input_int))
@staticmethod
def _sanitize_list_float(embedding: List[float]) -> List[float]:
for value in embedding:
if not isinstance(value, float):
raise ValueError(f"Value ({value}) does not have type float")
return embedding
# Compile pattern only once, for better performance
_compiled_pattern: Pattern = re.compile("^[_a-zA-Z][_a-zA-Z0-9]*$")
@staticmethod
def _sanitize_metadata_keys(metadata: dict) -> dict:
for key in metadata.keys():
if not HanaDB._compiled_pattern.match(key):
raise ValueError(f"Invalid metadata key {key}")
return metadata
@staticmethod
def _sanitize_specific_metadata_columns(
specific_metadata_columns: List[str],
) -> List[str]:
metadata_columns = []
for c in specific_metadata_columns:
sanitized_name = HanaDB._sanitize_name(c)
metadata_columns.append(sanitized_name)
return metadata_columns
def _split_off_special_metadata(self, metadata: dict) -> Tuple[dict, list]:
# Use provided values by default or fallback
special_metadata = []
if not metadata:
return {}, []
for column_name in self.specific_metadata_columns:
special_metadata.append(metadata.get(column_name, None))
return metadata, special_metadata
[docs]
def add_texts( # type: ignore[override]
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
embeddings: Optional[List[List[float]]] = None,
**kwargs: Any,
) -> List[str]:
"""Add more texts to the vectorstore.
Args:
texts (Iterable[str]): Iterable of strings/text to add to the vectorstore.
metadatas (Optional[List[dict]], optional): Optional list of metadatas.
Defaults to None.
embeddings (Optional[List[List[float]]], optional): Optional pre-generated
embeddings. Defaults to None.
Returns:
List[str]: empty list
"""
# Create all embeddings of the texts beforehand to improve performance
if embeddings is None:
embeddings = self.embedding.embed_documents(list(texts))
# Create sql parameters array
sql_params = []
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
metadata, extracted_special_metadata = self._split_off_special_metadata(
metadata
)
embedding = (
embeddings[i]
if embeddings
else self.embedding.embed_documents([text])[0]
)
sql_params.append(
(
text,
json.dumps(HanaDB._sanitize_metadata_keys(metadata)),
f"[{','.join(map(str, embedding))}]",
*extracted_special_metadata,
)
)
# Insert data into the table
cur = self.connection.cursor()
try:
specific_metadata_columns_string = '", "'.join(
self.specific_metadata_columns
)
if specific_metadata_columns_string:
specific_metadata_columns_string = (
', "' + specific_metadata_columns_string + '"'
)
sql_str = (
f'INSERT INTO "{self.table_name}" ("{self.content_column}", '
f'"{self.metadata_column}", '
f'"{self.vector_column}"{specific_metadata_columns_string}) '
f"VALUES (?, ?, TO_REAL_VECTOR (?)"
f"{', ?' * len(self.specific_metadata_columns)});"
)
cur.executemany(sql_str, sql_params)
finally:
cur.close()
return []
[docs]
@classmethod
def from_texts( # type: ignore[no-untyped-def, override]
cls: Type[HanaDB],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
connection: dbapi.Connection = None,
distance_strategy: DistanceStrategy = default_distance_strategy,
table_name: str = default_table_name,
content_column: str = default_content_column,
metadata_column: str = default_metadata_column,
vector_column: str = default_vector_column,
vector_column_length: int = default_vector_column_length,
*,
specific_metadata_columns: Optional[List[str]] = None,
):
"""Create a HanaDB instance from raw documents.
This is a user-friendly interface that:
1. Embeds documents.
2. Creates a table if it does not yet exist.
3. Adds the documents to the table.
This is intended to be a quick way to get started.
"""
instance = cls(
connection=connection,
embedding=embedding,
distance_strategy=distance_strategy,
table_name=table_name,
content_column=content_column,
metadata_column=metadata_column,
vector_column=vector_column,
vector_column_length=vector_column_length, # -1 means dynamic length
specific_metadata_columns=specific_metadata_columns,
)
instance.add_texts(texts, metadatas)
return instance
[docs]
def similarity_search( # type: ignore[override]
self, query: str, k: int = 4, filter: Optional[dict] = None
) -> List[Document]:
"""Return docs most similar to query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: A dictionary of metadata fields and values to filter by.
Defaults to None.
Returns:
List of Documents most similar to the query
"""
docs_and_scores = self.similarity_search_with_score(
query=query, k=k, filter=filter
)
return [doc for doc, _ in docs_and_scores]
[docs]
def similarity_search_with_score(
self, query: str, k: int = 4, filter: Optional[dict] = None
) -> List[Tuple[Document, float]]:
"""Return documents and score values most similar to query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: A dictionary of metadata fields and values to filter by.
Defaults to None.
Returns:
List of tuples (containing a Document and a score) that are
most similar to the query
"""
embedding = self.embedding.embed_query(query)
return self.similarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter
)
[docs]
def similarity_search_with_score_and_vector_by_vector(
self, embedding: List[float], k: int = 4, filter: Optional[dict] = None
) -> List[Tuple[Document, float, List[float]]]:
"""Return docs most similar to the given embedding.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: A dictionary of metadata fields and values to filter by.
Defaults to None.
Returns:
List of Documents most similar to the query and
score and the document's embedding vector for each
"""
result = []
k = HanaDB._sanitize_int(k)
embedding = HanaDB._sanitize_list_float(embedding)
distance_func_name = HANA_DISTANCE_FUNCTION[self.distance_strategy][0]
embedding_as_str = ",".join(map(str, embedding))
sql_str = (
f"SELECT TOP {k}"
f' "{self.content_column}", ' # row[0]
f' "{self.metadata_column}", ' # row[1]
f' TO_NVARCHAR("{self.vector_column}"), ' # row[2]
f' {distance_func_name}("{self.vector_column}", TO_REAL_VECTOR '
f" (ARRAY({embedding_as_str}))) AS CS " # row[3]
f'FROM "{self.table_name}"'
)
order_str = f" order by CS {HANA_DISTANCE_FUNCTION[self.distance_strategy][1]}"
where_str, query_tuple = self._create_where_by_filter(filter)
sql_str = sql_str + where_str
sql_str = sql_str + order_str
try:
cur = self.connection.cursor()
cur.execute(sql_str, query_tuple)
if cur.has_result_set():
rows = cur.fetchall()
for row in rows:
js = json.loads(row[1])
doc = Document(page_content=row[0], metadata=js)
result_vector = HanaDB._parse_float_array_from_string(row[2])
result.append((doc, row[3], result_vector))
finally:
cur.close()
return result
[docs]
def similarity_search_with_score_by_vector(
self, embedding: List[float], k: int = 4, filter: Optional[dict] = None
) -> List[Tuple[Document, float]]:
"""Return docs most similar to the given embedding.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: A dictionary of metadata fields and values to filter by.
Defaults to None.
Returns:
List of Documents most similar to the query and score for each
"""
whole_result = self.similarity_search_with_score_and_vector_by_vector(
embedding=embedding, k=k, filter=filter
)
return [(result_item[0], result_item[1]) for result_item in whole_result]
[docs]
def similarity_search_by_vector( # type: ignore[override]
self, embedding: List[float], k: int = 4, filter: Optional[dict] = None
) -> List[Document]:
"""Return docs most similar to embedding vector.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: A dictionary of metadata fields and values to filter by.
Defaults to None.
Returns:
List of Documents most similar to the query vector.
"""
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter
)
return [doc for doc, _ in docs_and_scores]
def _create_where_by_filter(self, filter): # type: ignore[no-untyped-def]
query_tuple = []
where_str = ""
if filter:
where_str, query_tuple = self._process_filter_object(filter)
where_str = " WHERE " + where_str
return where_str, query_tuple
def _process_filter_object(self, filter): # type: ignore[no-untyped-def]
query_tuple = []
where_str = ""
if filter:
for i, key in enumerate(filter.keys()):
filter_value = filter[key]
if i != 0:
where_str += " AND "
# Handling of 'special' boolean operators "$and", "$or"
if key in LOGICAL_OPERATORS_TO_SQL:
logical_operator = LOGICAL_OPERATORS_TO_SQL[key]
logical_operands = filter_value
for j, logical_operand in enumerate(logical_operands):
if j != 0:
where_str += f" {logical_operator} "
(
where_str_logical,
query_tuple_logical,
) = self._process_filter_object(logical_operand)
where_str += where_str_logical
query_tuple += query_tuple_logical
continue
operator = "="
sql_param = "?"
if isinstance(filter_value, bool):
query_tuple.append("true" if filter_value else "false")
elif isinstance(filter_value, int) or isinstance(filter_value, str):
query_tuple.append(filter_value)
elif isinstance(filter_value, Dict):
# Handling of 'special' operators starting with "$"
special_op = next(iter(filter_value))
special_val = filter_value[special_op]
# "$eq", "$ne", "$lt", "$lte", "$gt", "$gte"
if special_op in COMPARISONS_TO_SQL:
operator = COMPARISONS_TO_SQL[special_op]
if isinstance(special_val, bool):
query_tuple.append("true" if special_val else "false")
elif isinstance(special_val, float):
sql_param = "CAST(? as float)"
query_tuple.append(special_val)
elif (
isinstance(special_val, dict)
and "type" in special_val
and special_val["type"] == "date"
):
# Date type
sql_param = "CAST(? as DATE)"
query_tuple.append(special_val["date"])
else:
query_tuple.append(special_val)
# "$between"
elif special_op == BETWEEN_OPERATOR:
between_from = special_val[0]
between_to = special_val[1]
operator = "BETWEEN"
sql_param = "? AND ?"
query_tuple.append(between_from)
query_tuple.append(between_to)
# "$like"
elif special_op == LIKE_OPERATOR:
operator = "LIKE"
query_tuple.append(special_val)
# "$in", "$nin"
elif special_op in IN_OPERATORS_TO_SQL:
operator = IN_OPERATORS_TO_SQL[special_op]
if isinstance(special_val, list):
for i, list_entry in enumerate(special_val):
if i == 0:
sql_param = "("
sql_param = sql_param + "?"
if i == (len(special_val) - 1):
sql_param = sql_param + ")"
else:
sql_param = sql_param + ","
query_tuple.append(list_entry)
else:
raise ValueError(
f"Unsupported value for {operator}: {special_val}"
)
else:
raise ValueError(f"Unsupported operator: {special_op}")
else:
raise ValueError(
f"Unsupported filter data-type: {type(filter_value)}"
)
selector = (
f' "{key}"'
if key in self.specific_metadata_columns
else f"JSON_VALUE({self.metadata_column}, '$.{key}')"
)
where_str += f"{selector} " f"{operator} {sql_param}"
return where_str, query_tuple
[docs]
def delete( # type: ignore[override]
self, ids: Optional[List[str]] = None, filter: Optional[dict] = None
) -> Optional[bool]:
"""Delete entries by filter with metadata values
Args:
ids: Deletion with ids is not supported! A ValueError will be raised.
filter: A dictionary of metadata fields and values to filter by.
An empty filter ({}) will delete all entries in the table.
Returns:
Optional[bool]: True, if deletion is technically successful.
Deletion of zero entries, due to non-matching filters is a success.
"""
if ids is not None:
raise ValueError("Deletion via ids is not supported")
if filter is None:
raise ValueError("Parameter 'filter' is required when calling 'delete'")
where_str, query_tuple = self._create_where_by_filter(filter)
sql_str = f'DELETE FROM "{self.table_name}" {where_str}'
try:
cur = self.connection.cursor()
cur.execute(sql_str, query_tuple)
finally:
cur.close()
return True
[docs]
async def adelete( # type: ignore[override]
self, ids: Optional[List[str]] = None, filter: Optional[dict] = None
) -> Optional[bool]:
"""Delete by vector ID or other criteria.
Args:
ids: List of ids to delete.
Returns:
Optional[bool]: True if deletion is successful,
False otherwise, None if not implemented.
"""
return await run_in_executor(None, self.delete, ids=ids, filter=filter)
[docs]
def max_marginal_relevance_search( # type: ignore[override]
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
query: search query text.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Filter on metadata properties, e.g.
{
"str_property": "foo",
"int_property": 123
}
Returns:
List of Documents selected by maximal marginal relevance.
"""
embedding = self.embedding.embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding=embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
)
def _parse_float_array_from_string(array_as_string: str) -> List[float]: # type: ignore[misc]
array_wo_brackets = array_as_string[1:-1]
return [float(x) for x in array_wo_brackets.split(",")]
[docs]
def max_marginal_relevance_search_by_vector( # type: ignore[override]
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
) -> List[Document]:
whole_result = self.similarity_search_with_score_and_vector_by_vector(
embedding=embedding, k=fetch_k, filter=filter
)
embeddings = [result_item[2] for result_item in whole_result]
mmr_doc_indexes = maximal_marginal_relevance(
np.array(embedding), embeddings, lambda_mult=lambda_mult, k=k
)
return [whole_result[i][0] for i in mmr_doc_indexes]
[docs]
async def amax_marginal_relevance_search_by_vector( # type: ignore[override]
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance."""
return await run_in_executor(
None,
self.max_marginal_relevance_search_by_vector,
embedding=embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
)
@staticmethod
def _cosine_relevance_score_fn(distance: float) -> float:
return distance
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""
The 'correct' relevance function
may differ depending on a few things, including:
- the distance / similarity metric used by the VectorStore
- the scale of your embeddings (OpenAI's are unit normed. Many others are not!)
- embedding dimensionality
- etc.
Vectorstores should define their own selection based method of relevance.
"""
if self.distance_strategy == DistanceStrategy.COSINE:
return HanaDB._cosine_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
return HanaDB._euclidean_relevance_score_fn
else:
raise ValueError(
"Unsupported distance_strategy: {}".format(self.distance_strategy)
)