"""Pebblo's safe dataloader is a wrapper for document loaders"""
import logging
import os
import uuid
from importlib.metadata import version
from typing import Dict, Iterator, List, Optional
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utilities.pebblo import (
BATCH_SIZE_BYTES,
PLUGIN_VERSION,
App,
Framework,
IndexedDocument,
PebbloLoaderAPIWrapper,
generate_size_based_batches,
get_full_path,
get_loader_full_path,
get_loader_type,
get_runtime,
get_source_size,
)
logger = logging.getLogger(__name__)
[docs]class PebbloSafeLoader(BaseLoader):
"""Pebblo Safe Loader class is a wrapper around document loaders enabling the data
to be scrutinized.
"""
_discover_sent: bool = False
[docs] def __init__(
self,
langchain_loader: BaseLoader,
name: str,
owner: str = "",
description: str = "",
api_key: Optional[str] = None,
load_semantic: bool = False,
classifier_url: Optional[str] = None,
*,
classifier_location: str = "local",
):
if not name or not isinstance(name, str):
raise NameError("Must specify a valid name.")
self.app_name = name
self.load_id = str(uuid.uuid4())
self.loader = langchain_loader
self.load_semantic = os.environ.get("PEBBLO_LOAD_SEMANTIC") or load_semantic
self.owner = owner
self.description = description
self.source_path = get_loader_full_path(self.loader)
self.docs: List[Document] = []
self.docs_with_id: List[IndexedDocument] = []
loader_name = str(type(self.loader)).split(".")[-1].split("'")[0]
self.source_type = get_loader_type(loader_name)
self.source_path_size = get_source_size(self.source_path)
self.batch_size = BATCH_SIZE_BYTES
self.loader_details = {
"loader": loader_name,
"source_path": self.source_path,
"source_type": self.source_type,
**(
{"source_path_size": str(self.source_path_size)}
if self.source_path_size > 0
else {}
),
}
# generate app
self.app = self._get_app_details()
# initialize Pebblo Loader API client
self.pb_client = PebbloLoaderAPIWrapper(
api_key=api_key,
classifier_location=classifier_location,
classifier_url=classifier_url,
)
self.pb_client.send_loader_discover(self.app)
[docs] def load(self) -> List[Document]:
"""Load Documents.
Returns:
list: Documents fetched from load method of the wrapped `loader`.
"""
self.docs = self.loader.load()
# Classify docs in batches
self.classify_in_batches()
return self.docs
[docs] def classify_in_batches(self) -> None:
"""
Classify documents in batches.
This is to avoid API timeouts when sending large number of documents.
Batches are generated based on the page_content size.
"""
batches: List[List[Document]] = generate_size_based_batches(
self.docs, self.batch_size
)
processed_docs: List[Document] = []
total_batches = len(batches)
for i, batch in enumerate(batches):
is_last_batch: bool = i == total_batches - 1
self.docs = batch
self.docs_with_id = self._index_docs()
classified_docs = self.pb_client.classify_documents(
self.docs_with_id,
self.app,
self.loader_details,
loading_end=is_last_batch,
)
self._add_pebblo_specific_metadata(classified_docs)
if self.load_semantic:
batch_processed_docs = self._add_semantic_to_docs(classified_docs)
else:
batch_processed_docs = self._unindex_docs()
processed_docs.extend(batch_processed_docs)
self.docs = processed_docs
[docs] def lazy_load(self) -> Iterator[Document]:
"""Load documents in lazy fashion.
Raises:
NotImplementedError: raised when lazy_load id not implemented
within wrapped loader.
Yields:
list: Documents from loader's lazy loading.
"""
try:
doc_iterator = self.loader.lazy_load()
except NotImplementedError as exc:
err_str = f"{self.loader.__class__.__name__} does not implement lazy_load()"
logger.error(err_str)
raise NotImplementedError(err_str) from exc
while True:
try:
doc = next(doc_iterator)
except StopIteration:
self.docs = []
break
self.docs = list((doc,))
self.docs_with_id = self._index_docs()
classified_doc = self.pb_client.classify_documents(
self.docs_with_id, self.app, self.loader_details
)
self._add_pebblo_specific_metadata(classified_doc)
if self.load_semantic:
self.docs = self._add_semantic_to_docs(classified_doc)
else:
self.docs = self._unindex_docs()
yield self.docs[0]
[docs] @classmethod
def set_discover_sent(cls) -> None:
cls._discover_sent = True
def _get_app_details(self) -> App:
"""Fetch app details. Internal method.
Returns:
App: App details.
"""
framework, runtime = get_runtime()
app = App(
name=self.app_name,
owner=self.owner,
description=self.description,
load_id=self.load_id,
runtime=runtime,
framework=framework,
plugin_version=PLUGIN_VERSION,
client_version=Framework(
name="langchain_community",
version=version("langchain_community"),
),
)
return app
def _index_docs(self) -> List[IndexedDocument]:
"""
Indexes the documents and returns a list of IndexedDocument objects.
Returns:
List[IndexedDocument]: A list of IndexedDocument objects with unique IDs.
"""
docs_with_id = [
IndexedDocument(pb_id=str(i), **doc.dict())
for i, doc in enumerate(self.docs)
]
return docs_with_id
def _add_semantic_to_docs(self, classified_docs: Dict) -> List[Document]:
"""
Adds semantic metadata to the given list of documents.
Args:
classified_docs (Dict): A dictionary of dictionaries containing the
classified documents with pb_id as key.
Returns:
List[Document]: A list of Document objects with added semantic metadata.
"""
indexed_docs = {
doc.pb_id: Document(page_content=doc.page_content, metadata=doc.metadata)
for doc in self.docs_with_id
}
for classified_doc in classified_docs.values():
doc_id = classified_doc.get("pb_id")
if doc_id in indexed_docs:
self._add_semantic_to_doc(indexed_docs[doc_id], classified_doc)
semantic_metadata_docs = [doc for doc in indexed_docs.values()]
return semantic_metadata_docs
def _unindex_docs(self) -> List[Document]:
"""
Converts a list of IndexedDocument objects to a list of Document objects.
Returns:
List[Document]: A list of Document objects.
"""
docs = [
Document(page_content=doc.page_content, metadata=doc.metadata)
for i, doc in enumerate(self.docs_with_id)
]
return docs
def _add_semantic_to_doc(self, doc: Document, classified_doc: dict) -> Document:
"""
Adds semantic metadata to the given document in-place.
Args:
doc (Document): A Document object.
classified_doc (dict): A dictionary containing the classified document.
Returns:
Document: The Document object with added semantic metadata.
"""
doc.metadata["pebblo_semantic_entities"] = list(
classified_doc.get("entities", {}).keys()
)
doc.metadata["pebblo_semantic_topics"] = list(
classified_doc.get("topics", {}).keys()
)
return doc
def _add_pebblo_specific_metadata(self, classified_docs: dict) -> None:
"""Add Pebblo specific metadata to documents."""
for doc in self.docs_with_id:
doc_metadata = doc.metadata
if self.loader.__class__.__name__ == "SharePointLoader":
doc_metadata["full_path"] = get_full_path(
doc_metadata.get("source", self.source_path)
)
else:
doc_metadata["full_path"] = get_full_path(
doc_metadata.get(
"full_path", doc_metadata.get("source", self.source_path)
)
)
doc_metadata["pb_checksum"] = classified_docs.get(doc.pb_id, {}).get(
"pb_checksum", None
)