Source code for langchain_community.vectorstores.tablestore

import json
import logging
import uuid
from typing import (
    Any,
    Iterable,
    List,
    Optional,
    Sequence,
    Tuple,
)

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

logger = logging.getLogger(__name__)


[docs] class TablestoreVectorStore(VectorStore): """`Tablestore` vector store. To use, you should have the ``tablestore`` python package installed. Example: .. code-block:: python import os from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import TablestoreVectorStore import tablestore embeddings = OpenAIEmbeddings() store = TablestoreVectorStore( embeddings, endpoint=os.getenv("end_point"), instance_name=os.getenv("instance_name"), access_key_id=os.getenv("access_key_id"), access_key_secret=os.getenv("access_key_secret"), vector_dimension=512, # metadata mapping is used to filter non-vector fields. metadata_mappings=[ tablestore.FieldSchema( "type", tablestore.FieldType.KEYWORD, index=True, enable_sort_and_agg=True ), tablestore.FieldSchema( "time", tablestore.FieldType.LONG, index=True, enable_sort_and_agg=True ), ] ) """
[docs] def __init__( self, embedding: Embeddings, *, endpoint: Optional[str] = None, instance_name: Optional[str] = None, access_key_id: Optional[str] = None, access_key_secret: Optional[str] = None, table_name: Optional[str] = "langchain_vector_store_ots_v1", index_name: Optional[str] = "langchain_vector_store_ots_index_v1", text_field: Optional[str] = "content", vector_field: Optional[str] = "embedding", vector_dimension: int = 512, vector_metric_type: Optional[str] = "cosine", metadata_mappings: Optional[List[Any]] = None, ): try: import tablestore except ImportError: raise ImportError( "Could not import tablestore python package. " "Please install it with `pip install tablestore`." ) self.__embedding = embedding self.__tablestore_client = tablestore.OTSClient( endpoint, access_key_id, access_key_secret, instance_name, retry_policy=tablestore.WriteRetryPolicy(), ) self.__table_name = table_name self.__index_name = index_name self.__vector_dimension = vector_dimension self.__vector_field = vector_field self.__text_field = text_field if vector_metric_type == "cosine": self.__vector_metric_type = tablestore.VectorMetricType.VM_COSINE elif vector_metric_type == "euclidean": self.__vector_metric_type = tablestore.VectorMetricType.VM_EUCLIDEAN elif vector_metric_type == "dot_product": self.__vector_metric_type = tablestore.VectorMetricType.VM_DOT_PRODUCT else: raise ValueError( f"Unsupported vector_metric_type operator: {vector_metric_type}" ) self.__metadata_mappings = [ tablestore.FieldSchema( self.__text_field, tablestore.FieldType.TEXT, index=True, enable_sort_and_agg=False, store=False, analyzer=tablestore.AnalyzerType.MAXWORD, ), tablestore.FieldSchema( self.__vector_field, tablestore.FieldType.VECTOR, vector_options=tablestore.VectorOptions( data_type=tablestore.VectorDataType.VD_FLOAT_32, dimension=self.__vector_dimension, metric_type=self.__vector_metric_type, ), ), ] if metadata_mappings: for mapping in metadata_mappings: if not isinstance(mapping, tablestore.FieldSchema): raise ValueError( f"meta_data mapping should be an " f"instance of tablestore.FieldSchema, " f"bug got {type(mapping)}" ) if ( mapping.field_name == text_field or mapping.field_name == vector_field ): continue self.__metadata_mappings.append(mapping)
[docs] def create_table_if_not_exist(self) -> None: """Create table if not exist.""" try: import tablestore except ImportError: raise ImportError( "Could not import tablestore python package. " "Please install it with `pip install tablestore`." ) table_list = self.__tablestore_client.list_table() if self.__table_name in table_list: logger.info("Tablestore system table[%s] already exists", self.__table_name) return None logger.info( "Tablestore system table[%s] does not exist, try to create the table.", self.__table_name, ) schema_of_primary_key = [("id", "STRING")] table_meta = tablestore.TableMeta(self.__table_name, schema_of_primary_key) table_options = tablestore.TableOptions() reserved_throughput = tablestore.ReservedThroughput( tablestore.CapacityUnit(0, 0) ) try: self.__tablestore_client.create_table( table_meta, table_options, reserved_throughput ) logger.info("Tablestore create table[%s] successfully.", self.__table_name) except tablestore.OTSClientError as e: logger.exception( "Tablestore create system table[%s] failed with client error, " "http_status:%d, error_message:%s", self.__table_name, e.get_http_status(), e.get_error_message(), ) except tablestore.OTSServiceError as e: logger.exception( "Tablestore create system table[%s] failed with client error, " "http_status:%d, error_code:%s, error_message:%s, request_id:%s", self.__table_name, e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id(), )
[docs] def create_search_index_if_not_exist(self) -> None: """Create search index if not exist.""" try: import tablestore except ImportError: raise ImportError( "Could not import tablestore python package. " "Please install it with `pip install tablestore`." ) search_index_list = self.__tablestore_client.list_search_index( table_name=self.__table_name ) if self.__index_name in [t[1] for t in search_index_list]: logger.info("Tablestore system index[%s] already exists", self.__index_name) return None index_meta = tablestore.SearchIndexMeta(self.__metadata_mappings) self.__tablestore_client.create_search_index( self.__table_name, self.__index_name, index_meta ) logger.info( "Tablestore create system index[%s] successfully.", self.__index_name )
[docs] def delete_table_if_exists(self) -> None: """Delete table if exists.""" search_index_list = self.__tablestore_client.list_search_index( table_name=self.__table_name ) for resp_tuple in search_index_list: self.__tablestore_client.delete_search_index(resp_tuple[0], resp_tuple[1]) self.__tablestore_client.delete_table(self.__table_name)
[docs] def delete_search_index(self, table_name: str, index_name: str) -> None: """Delete search index.""" self.__tablestore_client.delete_search_index(table_name, index_name)
def __write_row( self, row_id: str, content: str, embedding_vector: List[float], meta_data: dict ) -> None: try: import tablestore except ImportError: raise ImportError( "Could not import tablestore python package. " "Please install it with `pip install tablestore`." ) primary_key = [("id", row_id)] attribute_columns = [ (self.__text_field, content), (self.__vector_field, json.dumps(embedding_vector)), ] for k, v in meta_data.items(): item = (k, v) attribute_columns.append(item) row = tablestore.Row(primary_key, attribute_columns) try: self.__tablestore_client.put_row(self.__table_name, row) logger.debug( "Tablestore put row successfully. id:%s, content:%s, meta_data:%s", row_id, content, meta_data, ) except tablestore.OTSClientError as e: logger.exception( "Tablestore put row failed with client error:%s, " "id:%s, content:%s, meta_data:%s", e, row_id, content, meta_data, ) except tablestore.OTSServiceError as e: logger.exception( "Tablestore put row failed with client error:%s, id:%s, content:%s, " "meta_data:%s, http_status:%d, " "error_code:%s, error_message:%s, request_id:%s", e, row_id, content, meta_data, e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id(), ) def __delete_row(self, row_id: str) -> None: try: import tablestore except ImportError: raise ImportError( "Could not import tablestore python package. " "Please install it with `pip install tablestore`." ) primary_key = [("id", row_id)] try: self.__tablestore_client.delete_row(self.__table_name, primary_key, None) logger.info("Tablestore delete row successfully. id:%s", row_id) except tablestore.OTSClientError as e: logger.exception( "Tablestore delete row failed with client error:%s, id:%s", e, row_id ) except tablestore.OTSServiceError as e: logger.exception( "Tablestore delete row failed with client error:%s, " "id:%s, http_status:%d, error_code:%s, error_message:%s, request_id:%s", e, row_id, e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id(), ) def __get_row(self, row_id: str) -> Document: try: import tablestore except ImportError: raise ImportError( "Could not import tablestore python package. " "Please install it with `pip install tablestore`." ) primary_key = [("id", row_id)] try: _, row, _ = self.__tablestore_client.get_row( self.__table_name, primary_key, None, None, 1 ) logger.debug("Tablestore get row successfully. id:%s", row_id) if row is None: raise ValueError("Can't not find row_id:%s in tablestore." % row_id) document_id = row.primary_key[0][1] meta_data = {} text = "" for col in row.attribute_columns: key = col[0] val = col[1] if key == self.__text_field: text = val continue meta_data[key] = val return Document( id=document_id, page_content=text, metadata=meta_data, ) except tablestore.OTSClientError as e: logger.exception( "Tablestore get row failed with client error:%s, id:%s", e, row_id ) raise e except tablestore.OTSServiceError as e: logger.exception( "Tablestore get row failed with client error:%s, " "id:%s, http_status:%d, error_code:%s, error_message:%s, request_id:%s", e, row_id, e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id(), ) raise e def _tablestore_search( self, query_embedding: List[float], k: int = 5, tablestore_filter_query: Optional[Any] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: try: import tablestore except ImportError: raise ImportError( "Could not import tablestore python package. " "Please install it with `pip install tablestore`." ) if tablestore_filter_query: if not isinstance(tablestore_filter_query, tablestore.Query): raise ValueError( f"table_store_filter_query should be " f"an instance of tablestore.Query, " f"bug got {type(tablestore_filter_query)}" ) if "knn_top_k" in kwargs: knn_top_k = kwargs["knn_top_k"] else: knn_top_k = k ots_query = tablestore.KnnVectorQuery( field_name=self.__vector_field, top_k=knn_top_k, float32_query_vector=query_embedding, filter=tablestore_filter_query, ) sort = tablestore.Sort( sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)] ) search_query = tablestore.SearchQuery( ots_query, limit=k, get_total_count=False, sort=sort ) try: search_response = self.__tablestore_client.search( table_name=self.__table_name, index_name=self.__index_name, search_query=search_query, columns_to_get=tablestore.ColumnsToGet( return_type=tablestore.ColumnReturnType.ALL ), ) logger.info( "Tablestore search successfully. request_id:%s", search_response.request_id, ) tuple_list = [] for hit in search_response.search_hits: row = hit.row score = hit.score document_id = row[0][0][1] meta_data = {} text = "" for col in row[1]: key = col[0] val = col[1] if key == self.__text_field: text = val continue if key == self.__vector_field: val = json.loads(val) meta_data[key] = val doc = Document( id=document_id, page_content=text, metadata=meta_data, ) tuple_list.append((doc, score)) return tuple_list except tablestore.OTSClientError as e: logger.exception("Tablestore search failed with client error:%s", e) raise e except tablestore.OTSServiceError as e: logger.exception( "Tablestore search failed with client error:%s, " "http_status:%d, error_code:%s, error_message:%s, request_id:%s", e, e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id(), ) raise e
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: ids = ids or [str(uuid.uuid4().hex) for _ in texts] text_list = list(texts) embeddings = self.__embedding.embed_documents(text_list) for i in range(len(ids)): row_id = ids[i] text = text_list[i] embedding_vector = embeddings[i] metadata = dict() if metadatas and metadatas[i]: metadata = metadatas[i] self.__write_row( row_id=row_id, content=text, embedding_vector=embedding_vector, meta_data=metadata, ) return ids
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: if ids: for row_id in ids: self.__delete_row(row_id) return True
[docs] def get_by_ids(self, ids: Sequence[str], /) -> List[Document]: return [self.__get_row(row_id) for row_id in ids]
[docs] def similarity_search_with_score( self, query: str, k: int = 4, tablestore_filter_query: Optional[Any] = None, *args: Any, **kwargs: Any, ) -> List[Tuple[Document, float]]: query_embedding = self.__embedding.embed_query(query) return self._tablestore_search( query_embedding, k=k, tablestore_filter_query=tablestore_filter_query, **kwargs, )
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, tablestore_filter_query: Optional[Any] = None, **kwargs: Any, ) -> List[Document]: return [ doc for (doc, score) in self._tablestore_search( embedding, k=k, tablestore_filter_query=tablestore_filter_query, **kwargs, ) ]
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, endpoint: Optional[str] = None, instance_name: Optional[str] = None, access_key_id: Optional[str] = None, access_key_secret: Optional[str] = None, table_name: Optional[str] = "langchain_vector_store_ots_v1", index_name: Optional[str] = "langchain_vector_store_ots_index_v1", text_field: Optional[str] = "content", vector_field: Optional[str] = "embedding", vector_dimension: int = 512, vector_metric_type: Optional[str] = "cosine", metadata_mappings: Optional[List[Any]] = None, **kwargs: Any, ) -> "TablestoreVectorStore": store = cls( embedding=embedding, endpoint=endpoint, instance_name=instance_name, access_key_id=access_key_id, access_key_secret=access_key_secret, table_name=table_name, index_name=index_name, text_field=text_field, vector_field=vector_field, vector_dimension=vector_dimension, vector_metric_type=vector_metric_type, metadata_mappings=metadata_mappings, ) store.create_table_if_not_exist() store.create_search_index_if_not_exist() store.add_texts(texts, metadatas) return store