RecordManager#
- class langchain_core.indexing.base.RecordManager(namespace: str)[source]#
Abstract base class representing the interface for a record manager.
The record manager abstraction is used by the langchain indexing API.
The record manager keeps track of which documents have been written into a vectorstore and when they were written.
The indexing API computes hashes for each document and stores the hash together with the write time and the source id in the record manager.
On subsequent indexing runs, the indexing API can check the record manager to determine which documents have already been indexed and which have not.
This allows the indexing API to avoid re-indexing documents that have already been indexed, and to only index new documents.
The main benefit of this abstraction is that it works across many vectorstores. To be supported, a vectorstore needs to only support the ability to add and delete documents by ID. Using the record manager, the indexing API will be able to delete outdated documents and avoid redundant indexing of documents that have already been indexed.
The main constraints of this abstraction are:
It relies on the time-stamps to determine which documents have been indexed and which have not. This means that the time-stamps must be monotonically increasing. The timestamp should be the timestamp as measured by the server to minimize issues.
The record manager is currently implemented separately from the vectorstore, which means that the overall system becomes distributed and may create issues with consistency. For example, writing to record manager succeeds, but corresponding writing to vectorstore fails.
Initialize the record manager.
- Parameters:
namespace (str) – The namespace for the record manager.
Methods
__init__
(namespace)Initialize the record manager.
Asynchronously create the database schema for the record manager.
adelete_keys
(keys)Asynchronously delete specified records from the database.
aexists
(keys)Asynchronously check if the provided keys exist in the database.
Asynchronously get the current server time as a high resolution timestamp.
alist_keys
(*[, before, after, group_ids, limit])Asynchronously list records in the database based on the provided filters.
aupdate
(keys, *[, group_ids, time_at_least])Asynchronously upsert records into the database.
Create the database schema for the record manager.
delete_keys
(keys)Delete specified records from the database.
exists
(keys)Check if the provided keys exist in the database.
get_time
()Get the current server time as a high resolution timestamp!
list_keys
(*[, before, after, group_ids, limit])List records in the database based on the provided filters.
update
(keys, *[, group_ids, time_at_least])Upsert records into the database.
- __init__(namespace: str) None [source]#
Initialize the record manager.
- Parameters:
namespace (str) – The namespace for the record manager.
- Return type:
None
- abstract async acreate_schema() None [source]#
Asynchronously create the database schema for the record manager.
- Return type:
None
- abstract async adelete_keys(keys: Sequence[str]) None [source]#
Asynchronously delete specified records from the database.
- Parameters:
keys (Sequence[str]) – A list of keys to delete.
- Return type:
None
- abstract async aexists(keys: Sequence[str]) List[bool] [source]#
Asynchronously check if the provided keys exist in the database.
- Parameters:
keys (Sequence[str]) – A list of keys to check.
- Returns:
A list of boolean values indicating the existence of each key.
- Return type:
List[bool]
- abstract async aget_time() float [source]#
Asynchronously get the current server time as a high resolution timestamp.
It’s important to get this from the server to ensure a monotonic clock, otherwise there may be data loss when cleaning up old documents!
- Returns:
The current server time as a float timestamp.
- Return type:
float
- abstract async alist_keys(*, before: float | None = None, after: float | None = None, group_ids: Sequence[str] | None = None, limit: int | None = None) List[str] [source]#
Asynchronously list records in the database based on the provided filters.
- Parameters:
before (float | None) – Filter to list records updated before this time.
after (float | None) – Filter to list records updated after this time.
group_ids (Sequence[str] | None) – Filter to list records with specific group IDs.
limit (int | None) – optional limit on the number of records to return.
- Returns:
A list of keys for the matching records.
- Return type:
List[str]
- abstract async aupdate(keys: Sequence[str], *, group_ids: Sequence[str | None] | None = None, time_at_least: float | None = None) None [source]#
Asynchronously upsert records into the database.
- Parameters:
keys (Sequence[str]) – A list of record keys to upsert.
group_ids (Sequence[str | None] | None) – A list of group IDs corresponding to the keys.
time_at_least (float | None) –
Optional timestamp. Implementation can use this to optionally verify that the timestamp IS at least this time in the system that stores the data.
e.g., use to validate that the time in the postgres database is equal to or larger than the given timestamp, if not raise an error.
This is meant to help prevent time-drift issues since time may not be monotonically increasing!
- Raises:
ValueError – If the length of keys doesn’t match the length of group_ids.
- Return type:
None
- abstract create_schema() None [source]#
Create the database schema for the record manager.
- Return type:
None
- abstract delete_keys(keys: Sequence[str]) None [source]#
Delete specified records from the database.
- Parameters:
keys (Sequence[str]) – A list of keys to delete.
- Return type:
None
- abstract exists(keys: Sequence[str]) List[bool] [source]#
Check if the provided keys exist in the database.
- Parameters:
keys (Sequence[str]) – A list of keys to check.
- Returns:
A list of boolean values indicating the existence of each key.
- Return type:
List[bool]
- abstract get_time() float [source]#
Get the current server time as a high resolution timestamp!
It’s important to get this from the server to ensure a monotonic clock, otherwise there may be data loss when cleaning up old documents!
- Returns:
The current server time as a float timestamp.
- Return type:
float
- abstract list_keys(*, before: float | None = None, after: float | None = None, group_ids: Sequence[str] | None = None, limit: int | None = None) List[str] [source]#
List records in the database based on the provided filters.
- Parameters:
before (float | None) – Filter to list records updated before this time.
after (float | None) – Filter to list records updated after this time.
group_ids (Sequence[str] | None) – Filter to list records with specific group IDs.
limit (int | None) – optional limit on the number of records to return.
- Returns:
A list of keys for the matching records.
- Return type:
List[str]
- abstract update(keys: Sequence[str], *, group_ids: Sequence[str | None] | None = None, time_at_least: float | None = None) None [source]#
Upsert records into the database.
- Parameters:
keys (Sequence[str]) – A list of record keys to upsert.
group_ids (Sequence[str | None] | None) – A list of group IDs corresponding to the keys.
time_at_least (float | None) –
Optional timestamp. Implementation can use this to optionally verify that the timestamp IS at least this time in the system that stores the data.
e.g., use to validate that the time in the postgres database is equal to or larger than the given timestamp, if not raise an error.
This is meant to help prevent time-drift issues since time may not be monotonically increasing!
- Raises:
ValueError – If the length of keys doesn’t match the length of group_ids.
- Return type:
None