Source code for langchain_community.indexes.base
from __future__ import annotations
import uuid
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence
NAMESPACE_UUID = uuid.UUID(int=1984)
[docs]
class RecordManager(ABC):
"""Abstract base class for a record manager."""
[docs]
def __init__(
self,
namespace: str,
) -> None:
"""Initialize the record manager.
Args:
namespace (str): The namespace for the record manager.
"""
self.namespace = namespace
[docs]
@abstractmethod
def create_schema(self) -> None:
"""Create the database schema for the record manager."""
[docs]
@abstractmethod
async def acreate_schema(self) -> None:
"""Create the database schema for the record manager."""
[docs]
@abstractmethod
def get_time(self) -> float:
"""Get the current server time as a high resolution timestamp!
It's important to get this from the server to ensure a monotonic clock,
otherwise there may be data loss when cleaning up old documents!
Returns:
The current server time as a float timestamp.
"""
[docs]
@abstractmethod
async def aget_time(self) -> float:
"""Get the current server time as a high resolution timestamp!
It's important to get this from the server to ensure a monotonic clock,
otherwise there may be data loss when cleaning up old documents!
Returns:
The current server time as a float timestamp.
"""
[docs]
@abstractmethod
def update(
self,
keys: Sequence[str],
*,
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
"""Upsert records into the database.
Args:
keys: A list of record keys to upsert.
group_ids: A list of group IDs corresponding to the keys.
time_at_least: if provided, updates should only happen if the
updated_at field is at least this time.
Raises:
ValueError: If the length of keys doesn't match the length of group_ids.
"""
[docs]
@abstractmethod
async def aupdate(
self,
keys: Sequence[str],
*,
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
"""Upsert records into the database.
Args:
keys: A list of record keys to upsert.
group_ids: A list of group IDs corresponding to the keys.
time_at_least: if provided, updates should only happen if the
updated_at field is at least this time.
Raises:
ValueError: If the length of keys doesn't match the length of group_ids.
"""
[docs]
@abstractmethod
def exists(self, keys: Sequence[str]) -> List[bool]:
"""Check if the provided keys exist in the database.
Args:
keys: A list of keys to check.
Returns:
A list of boolean values indicating the existence of each key.
"""
[docs]
@abstractmethod
async def aexists(self, keys: Sequence[str]) -> List[bool]:
"""Check if the provided keys exist in the database.
Args:
keys: A list of keys to check.
Returns:
A list of boolean values indicating the existence of each key.
"""
[docs]
@abstractmethod
def list_keys(
self,
*,
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
"""List records in the database based on the provided filters.
Args:
before: Filter to list records updated before this time.
after: Filter to list records updated after this time.
group_ids: Filter to list records with specific group IDs.
limit: optional limit on the number of records to return.
Returns:
A list of keys for the matching records.
"""
[docs]
@abstractmethod
async def alist_keys(
self,
*,
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
"""List records in the database based on the provided filters.
Args:
before: Filter to list records updated before this time.
after: Filter to list records updated after this time.
group_ids: Filter to list records with specific group IDs.
limit: optional limit on the number of records to return.
Returns:
A list of keys for the matching records.
"""
[docs]
@abstractmethod
def delete_keys(self, keys: Sequence[str]) -> None:
"""Delete specified records from the database.
Args:
keys: A list of keys to delete.
"""
[docs]
@abstractmethod
async def adelete_keys(self, keys: Sequence[str]) -> None:
"""Delete specified records from the database.
Args:
keys: A list of keys to delete.
"""