Source code for langchain_community.indexes.base

from __future__ import annotations

import uuid
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence

NAMESPACE_UUID = uuid.UUID(int=1984)


[docs]class RecordManager(ABC): """Abstract base class for a record manager."""
[docs] def __init__( self, namespace: str, ) -> None: """Initialize the record manager. Args: namespace (str): The namespace for the record manager. """ self.namespace = namespace
[docs] @abstractmethod def create_schema(self) -> None: """Create the database schema for the record manager."""
[docs] @abstractmethod async def acreate_schema(self) -> None: """Create the database schema for the record manager."""
[docs] @abstractmethod def get_time(self) -> float: """Get the current server time as a high resolution timestamp! It's important to get this from the server to ensure a monotonic clock, otherwise there may be data loss when cleaning up old documents! Returns: The current server time as a float timestamp. """
[docs] @abstractmethod async def aget_time(self) -> float: """Get the current server time as a high resolution timestamp! It's important to get this from the server to ensure a monotonic clock, otherwise there may be data loss when cleaning up old documents! Returns: The current server time as a float timestamp. """
[docs] @abstractmethod def update( self, keys: Sequence[str], *, group_ids: Optional[Sequence[Optional[str]]] = None, time_at_least: Optional[float] = None, ) -> None: """Upsert records into the database. Args: keys: A list of record keys to upsert. group_ids: A list of group IDs corresponding to the keys. time_at_least: if provided, updates should only happen if the updated_at field is at least this time. Raises: ValueError: If the length of keys doesn't match the length of group_ids. """
[docs] @abstractmethod async def aupdate( self, keys: Sequence[str], *, group_ids: Optional[Sequence[Optional[str]]] = None, time_at_least: Optional[float] = None, ) -> None: """Upsert records into the database. Args: keys: A list of record keys to upsert. group_ids: A list of group IDs corresponding to the keys. time_at_least: if provided, updates should only happen if the updated_at field is at least this time. Raises: ValueError: If the length of keys doesn't match the length of group_ids. """
[docs] @abstractmethod def exists(self, keys: Sequence[str]) -> List[bool]: """Check if the provided keys exist in the database. Args: keys: A list of keys to check. Returns: A list of boolean values indicating the existence of each key. """
[docs] @abstractmethod async def aexists(self, keys: Sequence[str]) -> List[bool]: """Check if the provided keys exist in the database. Args: keys: A list of keys to check. Returns: A list of boolean values indicating the existence of each key. """
[docs] @abstractmethod def list_keys( self, *, before: Optional[float] = None, after: Optional[float] = None, group_ids: Optional[Sequence[str]] = None, limit: Optional[int] = None, ) -> List[str]: """List records in the database based on the provided filters. Args: before: Filter to list records updated before this time. after: Filter to list records updated after this time. group_ids: Filter to list records with specific group IDs. limit: optional limit on the number of records to return. Returns: A list of keys for the matching records. """
[docs] @abstractmethod async def alist_keys( self, *, before: Optional[float] = None, after: Optional[float] = None, group_ids: Optional[Sequence[str]] = None, limit: Optional[int] = None, ) -> List[str]: """List records in the database based on the provided filters. Args: before: Filter to list records updated before this time. after: Filter to list records updated after this time. group_ids: Filter to list records with specific group IDs. limit: optional limit on the number of records to return. Returns: A list of keys for the matching records. """
[docs] @abstractmethod def delete_keys(self, keys: Sequence[str]) -> None: """Delete specified records from the database. Args: keys: A list of keys to delete. """
[docs] @abstractmethod async def adelete_keys(self, keys: Sequence[str]) -> None: """Delete specified records from the database. Args: keys: A list of keys to delete. """