BigtableByteStore
This guide covers how to use Google Cloud Bigtable as a key-value store.
Bigtable is a key-value and wide-column store, ideal for fast access to structured, semi-structured, or unstructured data.
Overviewโ
The BigtableByteStore uses Google Cloud Bigtable as a backend for a key-value store. It supports synchronous and asynchronous operations for setting, getting, and deleting key-value pairs.
Integration detailsโ
| Class | Package | Local | JS support | Package downloads | Package latest |
|---|---|---|---|---|---|
| BigtableByteStore | langchain-google-bigtable | โ | โ |
Setupโ
Prerequisitesโ
To get started, you will need a Google Cloud project with an active Bigtable instance and table.
Installationโ
The integration is in the langchain-google-bigtable package. The command below also installs langchain-google-vertexai for the embedding cache example.
%pip install -qU langchain-google-bigtable langchain-google-vertexai
โ Set Your Google Cloud Projectโ
Set your Google Cloud project to use its resources within this notebook.
If you don't know your project ID, you can run gcloud config list or see the support page: Locate the project ID.
# @markdown Please fill in your project, instance, and table details.
PROJECT_ID = "your-gcp-project-id" # @param {type:"string"}
INSTANCE_ID = "your-instance-id" # @param {type:"string"}
TABLE_ID = "your-table-id" # @param {type:"string"}
!gcloud config set project {PROJECT_ID}
๐ Authenticationโ
Authenticate to Google Cloud to access your project resources.
- For Colab, use the cell below.
- For Vertex AI Workbench, see the setup instructions.
from google.colab import auth
auth.authenticate_user()
Instantiationโ
To use BigtableByteStore, we first ensure a table exists and then initialize a BigtableEngine to manage connections.
from langchain_google_bigtable import (
BigtableByteStore,
BigtableEngine,
init_key_value_store_table,
)
# Ensure the table and column family exist.
init_key_value_store_table(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
)
BigtableEngineโ
A BigtableEngine object handles the execution context for the store, especially for async operations. It's recommended to initialize a single engine and reuse it across multiple stores for better performance.
# Initialize the engine to manage async operations.
engine = await BigtableEngine.async_initialize(
project_id=PROJECT_ID, instance_id=INSTANCE_ID
)
BigtableByteStoreโ
This is the main class for interacting with the key-value store. It provides the methods for setting, getting, and deleting data.
# Initialize the store.
store = await BigtableByteStore.create(engine=engine, table_id=TABLE_ID)
Usageโ
The store supports both sync (mset, mget) and async (amset, amget) methods. This guide uses the async versions.
Setโ
Use amset to save key-value pairs to the store.
kv_pairs = [
("key1", b"value1"),
("key2", b"value2"),
("key3", b"value3"),
]
await store.amset(kv_pairs)
Getโ
Use amget to retrieve values. If a key is not found, None is returned for that key.
retrieved_vals = await store.amget(["key1", "key2", "nonexistent_key"])
print(retrieved_vals)
Deleteโ
Use amdelete to remove keys from the store.
await store.amdelete(["key3"])
# Verifying the key was deleted
await store.amget(["key1", "key3"])
Iterate over keysโ
Use ayield_keys to iterate over all keys or keys with a specific prefix.
all_keys = [key async for key in store.ayield_keys()]
print(f"All keys: {all_keys}")
prefixed_keys = [key async for key in store.ayield_keys(prefix="key1")]
print(f"Prefixed keys: {prefixed_keys}")
Advanced Usage: Embedding Cachingโ
A common use case for a key-value store is to cache expensive operations like computing text embeddings, which saves time and cost.
from langchain.embeddings import CacheBackedEmbeddings
from langchain_google_vertexai.embeddings import VertexAIEmbeddings
underlying_embeddings = VertexAIEmbeddings(
project=PROJECT_ID, model_name="textembedding-gecko@003"
)
# Use a namespace to avoid key collisions with other data.
cached_embedder = CacheBackedEmbeddings.from_bytes_store(
underlying_embeddings, store, namespace="text-embeddings"
)
print("First call (computes and caches embedding):")
%time embedding_result_1 = await cached_embedder.aembed_query("Hello, world!")
print("\nSecond call (retrieves from cache):")
%time embedding_result_2 = await cached_embedder.aembed_query("Hello, world!")
As a Simple Document Retrieverโ
This section shows how to create a simple retriever using the Bigtable store. It acts as a document persistence layer, fetching documents that match a query prefix.
from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from typing import List, Optional, Any, Union
import json
class SimpleKVStoreRetriever(BaseRetriever):
"""A simple retriever that retrieves documents based on a prefix match in the key-value store."""
store: BigtableByteStore
documents: List[Union[Document, str]]
k: int
def set_up_store(self):
kv_pairs_to_set = []
for i, doc in enumerate(self.documents):
if isinstance(doc, str):
doc = Document(page_content=doc)
if not doc.id:
doc.id = str(i)
value = (
"Page Content\n"
+ doc.page_content
+ "\nMetadata"
+ json.dumps(doc.metadata)
)
kv_pairs_to_set.append((doc.id, value.encode("utf-8")))
self.store.mset(kv_pairs_to_set)
async def _aget_relevant_documents(
self,
query: str,
*,
run_manager: Optional[CallbackManagerForRetrieverRun] = None,
) -> List[Document]:
keys = [key async for key in self.store.ayield_keys(prefix=query)][: self.k]
documents_retrieved = []
async for document in await self.store.amget(keys):
if document:
document_str = document.decode("utf-8")
page_content = document_str.split("Content\n")[1].split("\nMetadata")[0]
metadata = json.loads(document_str.split("\nMetadata")[1])
documents_retrieved.append(
Document(page_content=page_content, metadata=metadata)
)
return documents_retrieved
def _get_relevant_documents(
self,
query: str,
*,
run_manager: Optional[CallbackManagerForRetrieverRun] = None,
) -> list[Document]:
keys = [key for key in self.store.yield_keys(prefix=query)][: self.k]
documents_retrieved = []
for document in self.store.mget(keys):
if document:
document_str = document.decode("utf-8")
page_content = document_str.split("Content\n")[1].split("\nMetadata")[0]
metadata = json.loads(document_str.split("\nMetadata")[1])
documents_retrieved.append(
Document(page_content=page_content, metadata=metadata)
)
return documents_retrieved
documents = [
Document(
page_content="Goldfish are popular pets for beginners, requiring relatively simple care.",
metadata={"type": "fish", "trait": "low maintenance"},
id="fish#Goldfish",
),
Document(
page_content="Cats are independent pets that often enjoy their own space.",
metadata={"type": "cat", "trait": "independence"},
id="mammals#Cats",
),
Document(
page_content="Rabbits are social animals that need plenty of space to hop around.",
metadata={"type": "rabbit", "trait": "social"},
id="mammals#Rabbits",
),
]
retriever_store = BigtableByteStore.create_sync(
engine=engine, instance_id=INSTANCE_ID, table_id=TABLE_ID
)
KVDocumentRetriever = SimpleKVStoreRetriever(
store=retriever_store, documents=documents, k=2
)
KVDocumentRetriever.set_up_store()
KVDocumentRetriever.invoke("fish")
KVDocumentRetriever.invoke("mammals")
API referenceโ
For full details on the BigtableByteStore class, see the source code on GitHub.
Relatedโ
- Key-value store conceptual guide