Source code for langchain_community.utilities.pebblo
from__future__importannotationsimportjsonimportloggingimportosimportpathlibimportplatformfromenumimportEnumfromhttpimportHTTPStatusfromtypingimportAny,Dict,List,Optional,Tuplefromlangchain_core.documentsimportDocumentfromlangchain_core.envimportget_runtime_environmentfromlangchain_core.utilsimportget_from_dict_or_envfrompydanticimportBaseModelfromrequestsimportResponse,requestfromrequests.exceptionsimportRequestExceptionfromlangchain_community.document_loaders.baseimportBaseLoaderlogger=logging.getLogger(__name__)PLUGIN_VERSION="0.1.1"_DEFAULT_CLASSIFIER_URL="http://localhost:8000"_DEFAULT_PEBBLO_CLOUD_URL="https://api.daxa.ai"BATCH_SIZE_BYTES=100*1024# 100 KB# Supported loaders for Pebblo safe data loadingfile_loader=["JSONLoader","S3FileLoader","UnstructuredMarkdownLoader","UnstructuredPDFLoader","UnstructuredFileLoader","UnstructuredJsonLoader","PyPDFLoader","GCSFileLoader","AmazonTextractPDFLoader","CSVLoader","UnstructuredExcelLoader","UnstructuredEmailLoader",]dir_loader=["DirectoryLoader","S3DirLoader","SlackDirectoryLoader","PyPDFDirectoryLoader","NotionDirectoryLoader",]in_memory=["DataFrameLoader"]cloud_folder=["NotionDBLoader","GoogleDriveLoader","SharePointLoader",]LOADER_TYPE_MAPPING={"file":file_loader,"dir":dir_loader,"in-memory":in_memory,"cloud-folder":cloud_folder,}
[docs]classRoutes(str,Enum):"""Routes available for the Pebblo API as enumerator."""loader_doc="/v1/loader/doc"loader_app_discover="/v1/app/discover"
[docs]classIndexedDocument(Document):"""Pebblo Indexed Document."""pb_id:str"""Unique ID of the document."""
[docs]classRuntime(BaseModel):"""Pebblo Runtime."""type:str="local""""Runtime type. Defaults to 'local'."""host:str"""Host name of the runtime."""path:str"""Current working directory path."""ip:Optional[str]="""""IP address of the runtime. Defaults to ''."""platform:str"""Platform details of the runtime."""os:str"""OS name."""os_version:str"""OS version."""language:str"""Runtime kernel."""language_version:str"""Version of the runtime kernel."""runtime:str="local""""More runtime details. Defaults to 'local'."""
[docs]classFramework(BaseModel):"""Pebblo Framework instance."""name:str"""Name of the Framework."""version:str"""Version of the Framework."""
[docs]classApp(BaseModel):"""Pebblo AI application."""name:str"""Name of the app."""owner:str"""Owner of the app."""description:Optional[str]"""Description of the app."""load_id:str"""Unique load_id of the app instance."""runtime:Runtime"""Runtime details of the app."""framework:Framework"""Framework details of the app."""plugin_version:str"""Plugin version used for the app."""client_version:Framework"""Client version used for the app."""
[docs]classDoc(BaseModel):"""Pebblo document."""name:str"""Name of app originating this document."""owner:str"""Owner of app."""docs:list"""List of documents with its metadata."""plugin_version:str"""Pebblo plugin Version"""load_id:str"""Unique load_id of the app instance."""loader_details:dict"""Loader details with its metadata."""loading_end:bool"""Boolean, specifying end of loading of source."""source_owner:str"""Owner of the source of the loader."""classifier_location:str"""Location of the classifier."""anonymize_snippets:bool"""Whether to anonymize snippets going into VectorDB and the generated reports"""
[docs]defget_full_path(path:str)->str:"""Return an absolute local path for a local file/directory, for a network related path, return as is. Args: path (str): Relative path to be resolved. Returns: str: Resolved absolute path. """if(notpathor("://"inpath)or("/"==path[0])or(pathin["unknown","-","in-memory"])):returnpathfull_path=pathlib.Path(path)iffull_path.exists():full_path=full_path.resolve()returnstr(full_path)
[docs]defget_loader_type(loader:str)->str:"""Return loader type among, file, dir or in-memory. Args: loader (str): Name of the loader, whose type is to be resolved. Returns: str: One of the loader type among, file/dir/in-memory. """forloader_type,loadersinLOADER_TYPE_MAPPING.items():ifloaderinloaders:returnloader_typereturn"unsupported"
[docs]defget_loader_full_path(loader:BaseLoader)->str:"""Return an absolute source path of source of loader based on the keys present in Document. Args: loader (BaseLoader): Langchain document loader, derived from Baseloader. """fromlangchain_community.document_loadersimport(DataFrameLoader,GCSFileLoader,NotionDBLoader,S3FileLoader,)location="-"ifnotisinstance(loader,BaseLoader):logger.error("loader is not derived from BaseLoader, source location will be unknown!")returnlocationloader_dict=loader.__dict__try:if"bucket"inloader_dict:ifisinstance(loader,GCSFileLoader):location=f"gc://{loader.bucket}/{loader.blob}"elifisinstance(loader,S3FileLoader):location=f"s3://{loader.bucket}/{loader.key}"elif"source"inloader_dict:location=loader_dict["source"]iflocationand"channel"inloader_dict:channel=loader_dict["channel"]ifchannel:location=f"{location}/{channel}"elif"path"inloader_dict:location=loader_dict["path"]elif"file_path"inloader_dict:location=loader_dict["file_path"]elif"web_paths"inloader_dict:web_paths=loader_dict["web_paths"]ifweb_pathsandisinstance(web_paths,list)andlen(web_paths)>0:location=web_paths[0]# For in-memory types:elifisinstance(loader,DataFrameLoader):location="in-memory"elifisinstance(loader,NotionDBLoader):location=f"notiondb://{loader.database_id}"elifloader.__class__.__name__=="GoogleDriveLoader":ifloader_dict.get("folder_id"):folder_id=loader_dict.get("folder_id")location=f"https://drive.google.com/drive/u/2/folders/{folder_id}"elifloader_dict.get("file_ids"):file_ids=loader_dict.get("file_ids",[])location=", ".join([f"https://drive.google.com/file/d/{file_id}/view"forfile_idinfile_ids])elifloader_dict.get("document_ids"):document_ids=loader_dict.get("document_ids",[])location=", ".join([f"https://docs.google.com/document/d/{doc_id}/edit"fordoc_idindocument_ids])exceptException:passreturnget_full_path(str(location))
[docs]defget_runtime()->Tuple[Framework,Runtime]:"""Fetch the current Framework and Runtime details. Returns: Tuple[Framework, Runtime]: Framework and Runtime for the current app instance. """runtime_env=get_runtime_environment()framework=Framework(name="langchain",version=runtime_env.get("library_version",None))uname=platform.uname()runtime=Runtime(host=uname.node,path=os.environ["PWD"],platform=runtime_env.get("platform","unknown"),os=uname.system,os_version=uname.version,ip=get_ip(),language=runtime_env.get("runtime","unknown"),language_version=runtime_env.get("runtime_version","unknown"),)if"Darwin"inruntime.os:runtime.type="desktop"runtime.runtime="Mac OSX"logger.debug(f"framework {framework}")logger.debug(f"runtime {runtime}")returnframework,runtime
[docs]defget_ip()->str:"""Fetch local runtime ip address. Returns: str: IP address """importsocket# lazy importshost=socket.gethostname()try:public_ip=socket.gethostbyname(host)exceptException:public_ip=socket.gethostbyname("localhost")returnpublic_ip
[docs]defgenerate_size_based_batches(docs:List[Document],max_batch_size:int=100*1024)->List[List[Document]]:""" Generate batches of documents based on page_content size. Args: docs: List of documents to be batched. max_batch_size: Maximum size of each batch in bytes. Defaults to 100*1024(100KB) Returns: List[List[Document]]: List of batches of documents """batches:List[List[Document]]=[]current_batch:List[Document]=[]current_batch_size:int=0fordocindocs:# Calculate the size of the document in bytesdoc_size:int=len(doc.page_content.encode("utf-8"))ifdoc_size>max_batch_size:# If a single document exceeds the max batch size, send it as a single batchbatches.append([doc])else:ifcurrent_batch_size+doc_size>max_batch_size:# If adding this document exceeds the max batch size, start a new batchbatches.append(current_batch)current_batch=[]current_batch_size=0# Add document to the current batchcurrent_batch.append(doc)current_batch_size+=doc_size# Add the last batch if it has documentsifcurrent_batch:batches.append(current_batch)returnbatches
[docs]defget_file_owner_from_path(file_path:str)->str:"""Fetch owner of local file path. Args: file_path (str): Local file path. Returns: str: Name of owner. """try:importpwdfile_owner_uid=os.stat(file_path).st_uidfile_owner_name=pwd.getpwuid(file_owner_uid).pw_nameexceptException:file_owner_name="unknown"returnfile_owner_name
[docs]defget_source_size(source_path:str)->int:"""Fetch size of source path. Source can be a directory or a file. Args: source_path (str): Local path of data source. Returns: int: Source size in bytes. """ifnotsource_path:return0size=0ifos.path.isfile(source_path):size=os.path.getsize(source_path)elifos.path.isdir(source_path):total_size=0fordirpath,_,filenamesinos.walk(source_path):forfinfilenames:fp=os.path.join(dirpath,f)ifnotos.path.islink(fp):total_size+=os.path.getsize(fp)size=total_sizereturnsize
[docs]defcalculate_content_size(data:str)->int:"""Calculate the content size in bytes: - Encode the string to bytes using a specific encoding (e.g., UTF-8) - Get the length of the encoded bytes. Args: data (str): Data string. Returns: int: Size of string in bytes. """encoded_content=data.encode("utf-8")size=len(encoded_content)returnsize
[docs]classPebbloLoaderAPIWrapper(BaseModel):"""Wrapper for Pebblo Loader API."""api_key:Optional[str]# Use SecretStr"""API key for Pebblo Cloud"""classifier_location:str="local""""Location of the classifier, local or cloud. Defaults to 'local'"""classifier_url:Optional[str]"""URL of the Pebblo Classifier"""cloud_url:Optional[str]"""URL of the Pebblo Cloud"""anonymize_snippets:bool=False"""Whether to anonymize snippets going into VectorDB and the generated reports"""def__init__(self,**kwargs:Any):"""Validate that api key in environment."""kwargs["api_key"]=get_from_dict_or_env(kwargs,"api_key","PEBBLO_API_KEY","")kwargs["classifier_url"]=get_from_dict_or_env(kwargs,"classifier_url","PEBBLO_CLASSIFIER_URL",_DEFAULT_CLASSIFIER_URL)kwargs["cloud_url"]=get_from_dict_or_env(kwargs,"cloud_url","PEBBLO_CLOUD_URL",_DEFAULT_PEBBLO_CLOUD_URL)super().__init__(**kwargs)
[docs]defsend_loader_discover(self,app:App)->None:""" Send app discovery request to Pebblo server & cloud. Args: app (App): App instance to be discovered. """pebblo_resp=Nonepayload=app.dict(exclude_unset=True)ifself.classifier_location=="local":# Send app details to local classifierheaders=self._make_headers()app_discover_url=(f"{self.classifier_url}{Routes.loader_app_discover.value}")pebblo_resp=self.make_request("POST",app_discover_url,headers,payload)ifself.api_key:# Send app details to Pebblo cloud if api_key is presentheaders=self._make_headers(cloud_request=True)ifpebblo_resp:pebblo_server_version=json.loads(pebblo_resp.text).get("pebblo_server_version")payload.update({"pebblo_server_version":pebblo_server_version})payload.update({"pebblo_client_version":PLUGIN_VERSION})pebblo_cloud_url=f"{self.cloud_url}{Routes.loader_app_discover.value}"_=self.make_request("POST",pebblo_cloud_url,headers,payload)
[docs]defclassify_documents(self,docs_with_id:List[IndexedDocument],app:App,loader_details:dict,loading_end:bool=False,)->dict:""" Send documents to Pebblo server for classification. Then send classified documents to Daxa cloud(If api_key is present). Args: docs_with_id (List[IndexedDocument]): List of documents to be classified. app (App): App instance. loader_details (dict): Loader details. loading_end (bool): Boolean, indicating the halt of data loading by loader. """source_path=loader_details.get("source_path","")source_owner=get_file_owner_from_path(source_path)# Prepare docs for classificationdocs,source_aggregate_size=self.prepare_docs_for_classification(docs_with_id,source_path,loader_details)# Build payload for classificationpayload=self.build_classification_payload(app,docs,loader_details,source_owner,source_aggregate_size,loading_end)classified_docs={}ifself.classifier_location=="local":# Send docs to local classifierheaders=self._make_headers()load_doc_url=f"{self.classifier_url}{Routes.loader_doc.value}"try:pebblo_resp=self.make_request("POST",load_doc_url,headers,payload,300)ifpebblo_resp:# Updating structure of pebblo response docs for efficient searchingforclassified_docinjson.loads(pebblo_resp.text).get("docs",[]):classified_docs.update({classified_doc["pb_id"]:classified_doc})exceptExceptionase:logger.warning("An Exception caught in classify_documents: local %s",e)ifself.api_key:# Send docs to Pebblo cloud if api_key is presentifself.classifier_location=="local":# If local classifier is used add the classified information# and remove doc contentself.update_doc_data(payload["docs"],classified_docs)# Remove the anonymize_snippets key from payloadpayload.pop("anonymize_snippets",None)self.send_docs_to_pebblo_cloud(payload)elifself.classifier_location=="pebblo-cloud":logger.warning("API key is missing for sending docs to Pebblo cloud.")raiseNameError("API key is missing for sending docs to Pebblo cloud.")returnclassified_docs
[docs]defsend_docs_to_pebblo_cloud(self,payload:dict)->None:""" Send documents to Pebblo cloud. Args: payload (dict): The payload containing documents to be sent. """headers=self._make_headers(cloud_request=True)pebblo_cloud_url=f"{self.cloud_url}{Routes.loader_doc.value}"try:_=self.make_request("POST",pebblo_cloud_url,headers,payload)exceptExceptionase:logger.warning("An Exception caught in classify_documents: cloud %s",e)
def_make_headers(self,cloud_request:bool=False)->dict:""" Generate headers for the request. args: cloud_request (bool): flag indicating whether the request is for Pebblo cloud. returns: dict: Headers for the request. """headers={"Accept":"application/json","Content-Type":"application/json",}ifcloud_request:# Add API key for Pebblo cloud requestifself.api_key:headers.update({"x-api-key":self.api_key})else:logger.warning("API key is missing for Pebblo cloud request.")returnheaders
[docs]defbuild_classification_payload(self,app:App,docs:List[dict],loader_details:dict,source_owner:str,source_aggregate_size:int,loading_end:bool,)->dict:""" Build the payload for document classification. Args: app (App): App instance. docs (List[dict]): List of documents to be classified. loader_details (dict): Loader details. source_owner (str): Owner of the source. source_aggregate_size (int): Aggregate size of the source. loading_end (bool): Boolean indicating the halt of data loading by loader. Returns: dict: Payload for document classification. """payload:Dict[str,Any]={"name":app.name,"owner":app.owner,"docs":docs,"plugin_version":PLUGIN_VERSION,"load_id":app.load_id,"loader_details":loader_details,"loading_end":"false","source_owner":source_owner,"classifier_location":self.classifier_location,"anonymize_snippets":self.anonymize_snippets,}ifloading_endisTrue:payload["loading_end"]="true"if"loader_details"inpayload:payload["loader_details"]["source_aggregate_size"]=(source_aggregate_size)payload=Doc(**payload).dict(exclude_unset=True)returnpayload
[docs]@staticmethoddefmake_request(method:str,url:str,headers:dict,payload:Optional[dict]=None,timeout:int=20,)->Optional[Response]:""" Make a request to the Pebblo API Args: method (str): HTTP method (GET, POST, PUT, DELETE, etc.). url (str): URL for the request. headers (dict): Headers for the request. payload (Optional[dict]): Payload for the request (for POST, PUT, etc.). timeout (int): Timeout for the request in seconds. Returns: Optional[Response]: Response object if the request is successful. """try:response=request(method=method,url=url,headers=headers,json=payload,timeout=timeout)logger.debug("Request: method %s, url %s, len %s response status %s",method,response.request.url,str(len(response.request.bodyifresponse.request.bodyelse[])),str(response.status_code),)ifresponse.status_code>=HTTPStatus.INTERNAL_SERVER_ERROR:logger.warning(f"Pebblo Server: Error {response.status_code}")elifresponse.status_code>=HTTPStatus.BAD_REQUEST:logger.warning(f"Pebblo received an invalid payload: {response.text}")elifresponse.status_code!=HTTPStatus.OK:logger.warning(f"Pebblo returned an unexpected response code: "f"{response.status_code}")returnresponseexceptRequestException:logger.warning("Unable to reach server %s",url)exceptExceptionase:logger.warning("An Exception caught in make_request: %s",e)returnNone
[docs]@staticmethoddefprepare_docs_for_classification(docs_with_id:List[IndexedDocument],source_path:str,loader_details:dict,)->Tuple[List[dict],int]:""" Prepare documents for classification. Args: docs_with_id (List[IndexedDocument]): List of documents to be classified. source_path (str): Source path of the documents. loader_details (dict): Contains loader info. Returns: Tuple[List[dict], int]: Documents and the aggregate size of the source. """docs=[]source_aggregate_size=0doc_content=[doc.dict()fordocindocs_with_id]source_path_update=Falsefordocindoc_content:doc_metadata=doc.get("metadata",{})doc_authorized_identities=doc_metadata.get("authorized_identities",[])ifloader_details["loader"]=="SharePointLoader":doc_source_path=get_full_path(doc_metadata.get("source",loader_details["source_path"]))else:doc_source_path=get_full_path(doc_metadata.get("full_path",doc_metadata.get("source",source_path),))doc_source_owner=doc_metadata.get("owner",get_file_owner_from_path(doc_source_path))doc_source_size=doc_metadata.get("size",get_source_size(doc_source_path))page_content=str(doc.get("page_content"))page_content_size=calculate_content_size(page_content)source_aggregate_size+=page_content_sizedoc_id=doc.get("pb_id",None)or0docs.append({"doc":page_content,"source_path":doc_source_path,"pb_id":doc_id,"last_modified":doc.get("metadata",{}).get("last_modified"),"file_owner":doc_source_owner,**({"authorized_identities":doc_authorized_identities}ifdoc_authorized_identitieselse{}),**({"source_path_size":doc_source_size}ifdoc_source_sizeisnotNoneelse{}),})if(loader_details["loader"]=="SharePointLoader"andnotsource_path_update):loader_details["source_path"]=doc_metadata.get("source_full_url")source_path_update=Truereturndocs,source_aggregate_size
[docs]@staticmethoddefupdate_doc_data(docs:List[dict],classified_docs:dict)->None:""" Update the document data with classified information. Args: docs (List[dict]): List of document data to be updated. classified_docs (dict): The dictionary containing classified documents. """fordoc_dataindocs:classified_data=classified_docs.get(doc_data["pb_id"],{})# Update the document data with classified informationdoc_data.update({"pb_checksum":classified_data.get("pb_checksum"),"loader_source_path":classified_data.get("loader_source_path"),"entities":classified_data.get("entities",{}),"topics":classified_data.get("topics",{}),})# Remove the document contentdoc_data.pop("doc")