Source code for langchain_community.document_loaders.pebblo
"""Pebblo's safe dataloader is a wrapper for document loaders"""importloggingimportosimportuuidfromimportlib.metadataimportversionfromtypingimportAny,Dict,Iterable,Iterator,List,Optionalfromlangchain_core.documentsimportDocumentfromlangchain_community.document_loaders.baseimportBaseLoaderfromlangchain_community.utilities.pebbloimport(BATCH_SIZE_BYTES,PLUGIN_VERSION,App,Framework,IndexedDocument,PebbloLoaderAPIWrapper,generate_size_based_batches,get_full_path,get_loader_full_path,get_loader_type,get_runtime,get_source_size,)logger=logging.getLogger(__name__)
[docs]classPebbloSafeLoader(BaseLoader):"""Pebblo Safe Loader class is a wrapper around document loaders enabling the data to be scrutinized. """_discover_sent:bool=False
[docs]def__init__(self,langchain_loader:BaseLoader,name:str,owner:str="",description:str="",api_key:Optional[str]=None,load_semantic:bool=False,classifier_url:Optional[str]=None,*,classifier_location:str="local",anonymize_snippets:bool=False,):ifnotnameornotisinstance(name,str):raiseNameError("Must specify a valid name.")self.app_name=nameself.load_id=str(uuid.uuid4())self.loader=langchain_loaderself.load_semantic=os.environ.get("PEBBLO_LOAD_SEMANTIC")orload_semanticself.owner=ownerself.description=descriptionself.source_path=get_loader_full_path(self.loader)self.docs:List[Document]=[]self.docs_with_id:List[IndexedDocument]=[]loader_name=str(type(self.loader)).split(".")[-1].split("'")[0]self.source_type=get_loader_type(loader_name)self.source_path_size=get_source_size(self.source_path)self.batch_size=BATCH_SIZE_BYTESself.loader_details={"loader":loader_name,"source_path":self.source_path,"source_type":self.source_type,**({"source_path_size":str(self.source_path_size)}ifself.source_path_size>0else{}),}# generate appself.app=self._get_app_details()# initialize Pebblo Loader API clientself.pb_client=PebbloLoaderAPIWrapper(api_key=api_key,classifier_location=classifier_location,classifier_url=classifier_url,anonymize_snippets=anonymize_snippets,)self.pb_client.send_loader_discover(self.app)
[docs]defload(self)->List[Document]:"""Load Documents. Returns: list: Documents fetched from load method of the wrapped `loader`. """self.docs=self.loader.load()# Classify docs in batchesself.classify_in_batches()returnself.docs
[docs]defclassify_in_batches(self)->None:""" Classify documents in batches. This is to avoid API timeouts when sending large number of documents. Batches are generated based on the page_content size. """batches:List[List[Document]]=generate_size_based_batches(self.docs,self.batch_size)processed_docs:List[Document]=[]total_batches=len(batches)fori,batchinenumerate(batches):is_last_batch:bool=i==total_batches-1self.docs=batchself.docs_with_id=self._index_docs()classified_docs=self.pb_client.classify_documents(self.docs_with_id,self.app,self.loader_details,loading_end=is_last_batch,)self._add_pebblo_specific_metadata(classified_docs)ifself.load_semantic:batch_processed_docs=self._add_semantic_to_docs(classified_docs)else:batch_processed_docs=self._unindex_docs()processed_docs.extend(batch_processed_docs)self.docs=processed_docs
[docs]deflazy_load(self)->Iterator[Document]:"""Load documents in lazy fashion. Raises: NotImplementedError: raised when lazy_load id not implemented within wrapped loader. Yields: list: Documents from loader's lazy loading. """try:doc_iterator=self.loader.lazy_load()exceptNotImplementedErrorasexc:err_str=f"{self.loader.__class__.__name__} does not implement lazy_load()"logger.error(err_str)raiseNotImplementedError(err_str)fromexcwhileTrue:try:doc=next(doc_iterator)exceptStopIteration:self.docs=[]breakself.docs=list((doc,))self.docs_with_id=self._index_docs()classified_doc=self.pb_client.classify_documents(self.docs_with_id,self.app,self.loader_details)self._add_pebblo_specific_metadata(classified_doc)ifself.load_semantic:self.docs=self._add_semantic_to_docs(classified_doc)else:self.docs=self._unindex_docs()yieldself.docs[0]
def_get_app_details(self)->App:"""Fetch app details. Internal method. Returns: App: App details. """framework,runtime=get_runtime()app=App(name=self.app_name,owner=self.owner,description=self.description,load_id=self.load_id,runtime=runtime,framework=framework,plugin_version=PLUGIN_VERSION,client_version=Framework(name="langchain_community",version=version("langchain_community"),),)returnappdef_index_docs(self)->List[IndexedDocument]:""" Indexes the documents and returns a list of IndexedDocument objects. Returns: List[IndexedDocument]: A list of IndexedDocument objects with unique IDs. """docs_with_id=[IndexedDocument(pb_id=str(i),**doc.dict())fori,docinenumerate(self.docs)]returndocs_with_iddef_add_semantic_to_docs(self,classified_docs:Dict)->List[Document]:""" Adds semantic metadata to the given list of documents. Args: classified_docs (Dict): A dictionary of dictionaries containing the classified documents with pb_id as key. Returns: List[Document]: A list of Document objects with added semantic metadata. """indexed_docs={doc.pb_id:Document(page_content=doc.page_content,metadata=doc.metadata)fordocinself.docs_with_id}forclassified_docinclassified_docs.values():doc_id=classified_doc.get("pb_id")ifdoc_idinindexed_docs:self._add_semantic_to_doc(indexed_docs[doc_id],classified_doc)semantic_metadata_docs=[docfordocinindexed_docs.values()]returnsemantic_metadata_docsdef_unindex_docs(self)->List[Document]:""" Converts a list of IndexedDocument objects to a list of Document objects. Returns: List[Document]: A list of Document objects. """docs=[Document(page_content=doc.page_content,metadata=doc.metadata)fori,docinenumerate(self.docs_with_id)]returndocsdef_add_semantic_to_doc(self,doc:Document,classified_doc:dict)->Document:""" Adds semantic metadata to the given document in-place. Args: doc (Document): A Document object. classified_doc (dict): A dictionary containing the classified document. Returns: Document: The Document object with added semantic metadata. """doc.metadata["pebblo_semantic_entities"]=list(classified_doc.get("entities",{}).keys())doc.metadata["pebblo_semantic_topics"]=list(classified_doc.get("topics",{}).keys())returndocdef_add_pebblo_specific_metadata(self,classified_docs:dict)->None:"""Add Pebblo specific metadata to documents."""fordocinself.docs_with_id:doc_metadata=doc.metadataifself.loader.__class__.__name__=="SharePointLoader":doc_metadata["full_path"]=get_full_path(doc_metadata.get("source",self.source_path))else:doc_metadata["full_path"]=get_full_path(doc_metadata.get("full_path",doc_metadata.get("source",self.source_path)))doc_metadata["pb_checksum"]=classified_docs.get(doc.pb_id,{}).get("pb_checksum",None)
[docs]classPebbloTextLoader(BaseLoader):""" Loader for text data. Since PebbloSafeLoader is a wrapper around document loaders, this loader is used to load text data directly into Documents. """
[docs]def__init__(self,texts:Iterable[str],*,source:Optional[str]=None,ids:Optional[List[str]]=None,metadata:Optional[Dict[str,Any]]=None,metadatas:Optional[List[Dict[str,Any]]]=None,)->None:""" Args: texts: Iterable of text data. source: Source of the text data. Optional. Defaults to None. ids: List of unique identifiers for each text. Optional. Defaults to None. metadata: Metadata for all texts. Optional. Defaults to None. metadatas: List of metadata for each text. Optional. Defaults to None. """self.texts=textsself.source=sourceself.ids=idsself.metadata=metadataself.metadatas=metadatas
[docs]deflazy_load(self)->Iterator[Document]:""" Lazy load text data into Documents. Returns: Iterator of Documents """fori,textinenumerate(self.texts):_id=Nonemetadata=self.metadataor{}ifself.metadatasandi<len(self.metadatas)andself.metadatas[i]:metadata.update(self.metadatas[i])ifself.idsandi<len(self.ids):_id=self.ids[i]yieldDocument(id=_id,page_content=text,metadata=metadata)
[docs]defload(self)->List[Document]:""" Load text data into Documents. Returns: List of Documents """documents=[]fordocinself.lazy_load():documents.append(doc)returndocuments