"""Module contains a PDF parser based on Document AI from Google Cloud.You need to install two libraries to use this parser:pip install google-cloud-documentaipip install google-cloud-documentai-toolbox"""importloggingimportreimporttimefromdataclassesimportdataclassfromtypingimportTYPE_CHECKING,Any,Iterator,List,Optional,Sequencefromlangchain_core.document_loadersimportBaseBlobParserfromlangchain_core.document_loaders.blob_loadersimportBlobfromlangchain_core.documentsimportDocumentfromlangchain_core.utils.iterimportbatch_iteratefromlangchain_google_community._utilsimportget_client_infoifTYPE_CHECKING:fromgoogle.api_core.operationimportOperation# type: ignore[import]fromgoogle.cloud.documentaiimport(# type: ignore[import]DocumentProcessorServiceClient,)fromgoogle.cloud.documentai_v1.typesimportProcessOptionslogger=logging.getLogger(__name__)
[docs]@dataclassclassDocAIParsingResults:"""A dataclass to store Document AI parsing results."""source_path:strparsed_path:str
[docs]classDocAIParser(BaseBlobParser):"""`Google Cloud Document AI` parser. For a detailed explanation of Document AI, refer to the product documentation. https://cloud.google.com/document-ai/docs/overview """
[docs]def__init__(self,*,client:Optional["DocumentProcessorServiceClient"]=None,project_id:Optional[str]=None,location:Optional[str]=None,gcs_output_path:Optional[str]=None,processor_name:Optional[str]=None,):"""Initializes the parser. Args: client: a DocumentProcessorServiceClient to use location: a Google Cloud location where a Document AI processor is located gcs_output_path: a path on Google Cloud Storage to store parsing results processor_name: full resource name of a Document AI processor or processor version You should provide either a client or location (and then a client would be instantiated). """ifbool(client)==bool(location):raiseValueError("You must specify either a client or a location to instantiate ""a client.")pattern=r"projects\/[0-9]+\/locations\/[a-z\-0-9]+\/processors\/[a-z0-9]+"ifprocessor_nameandnotre.fullmatch(pattern,processor_name):raiseValueError(f"Processor name {processor_name} has the wrong format. If your ""prediction endpoint looks like https://us-documentai.googleapis.com""/v1/projects/PROJECT_ID/locations/us/processors/PROCESSOR_ID:process,"" use only projects/PROJECT_ID/locations/us/processors/PROCESSOR_ID ""part.")self._gcs_output_path=gcs_output_pathself._processor_name=processor_nameifclient:self._client=clientelse:try:fromgoogle.api_core.client_optionsimportClientOptionsfromgoogle.cloud.documentaiimportDocumentProcessorServiceClientexceptImportErrorasexc:raiseImportError("Could not import google-cloud-documentai python package. ""Please, install docai dependency group: ""`pip install langchain-google-community[docai]`")fromexcoptions=ClientOptions(quota_project_id=project_id,api_endpoint=f"{location}-documentai.googleapis.com",)self._client=DocumentProcessorServiceClient(client_options=options,client_info=get_client_info(module="document-ai"),)# get processor typeself._processor_type=self._client.get_processor(name=processor_name).typeifself._processor_type=="LAYOUT_PARSER_PROCESSOR":self._use_layout_parser=Trueelse:self._use_layout_parser=False
[docs]deflazy_parse(self,blob:Blob)->Iterator[Document]:"""Parses a blob lazily. Args: blobs: a Blob to parse This is a long-running operation. A recommended way is to batch documents together and use the `batch_parse()` method. """yield fromself.batch_parse([blob],gcs_output_path=self._gcs_output_path)
def_prepare_process_options(self,enable_native_pdf_parsing:Optional[bool]=True,page_range:Optional[List[int]]=None,chunk_size:Optional[int]=500,include_ancestor_headings:Optional[bool]=True,)->"ProcessOptions":"""Prepare process options for DocAI process request Args: enable_native_pdf_parsing: enable pdf embedded text extraction page_range: list of page numbers to parse. If `None`, entire document will be parsed. chunk_size: maximum number of characters per chunk (supported only with Document AI Layout Parser processor). include_ancestor_headings: whether or not to include ancestor headings when splitting (supported only with Document AI Layout Parser processor). """try:fromgoogle.cloud.documentai_v1.typesimportOcrConfig,ProcessOptionsexceptImportErrorasexc:raiseImportError("documentai package not found, please install it with ""`pip install langchain-google-community[docai]`")fromexcifself._use_layout_parser:layout_config=ProcessOptions.LayoutConfig(chunking_config=ProcessOptions.LayoutConfig.ChunkingConfig(chunk_size=chunk_size,include_ancestor_headings=include_ancestor_headings,))individual_page_selector=(ProcessOptions.IndividualPageSelector(pages=page_range)ifpage_rangeelseNone)process_options=ProcessOptions(layout_config=layout_config,individual_page_selector=individual_page_selector,)else:ocr_config=(OcrConfig(enable_native_pdf_parsing=enable_native_pdf_parsing)ifenable_native_pdf_parsingelseNone)individual_page_selector=(ProcessOptions.IndividualPageSelector(pages=page_range)ifpage_rangeelseNone)process_options=ProcessOptions(ocr_config=ocr_config,individual_page_selector=individual_page_selector)returnprocess_options
[docs]defonline_process(self,blob:Blob,field_mask:Optional[str]=None,**process_options_kwargs:Any,)->Iterator[Document]:"""Parses a blob lazily using online processing. Args: blob: a blob to parse. field_mask: a comma-separated list of which fields to include in the Document AI response. suggested: "text,pages.pageNumber,pages.layout" process_options_kwargs: optional parameters to pass to the Document AI processors """try:fromgoogle.cloudimportdocumentaiexceptImportErrorasexc:raiseImportError("Could not import google-cloud-documentai python package. ""Please, install docai dependency group: ""`pip install langchain-google-community[docai]`")fromexctry:fromgoogle.cloud.documentai_toolbox.wrappers.pageimport(# type: ignore[import]_text_from_layout,)exceptImportErrorasexc:raiseImportError("documentai_toolbox package not found, please install it with ""`pip install langchain-google-community[docai]`")fromexc# prepare process optionsprocess_options=self._prepare_process_options(**process_options_kwargs)response=self._client.process_document(documentai.ProcessRequest(name=self._processor_name,gcs_document=documentai.GcsDocument(gcs_uri=blob.path,mime_type=blob.mimetypeor"application/pdf",),process_options=process_options,skip_human_review=True,field_mask=field_mask,))ifself._use_layout_parser:yield from(Document(page_content=chunk.content,metadata={"chunk_id":chunk.chunk_id,"source":blob.path,},)forchunkinresponse.document.chunked_document.chunks)else:yield from(Document(page_content=_text_from_layout(page.layout,response.document.text),metadata={"page":page.page_number,"source":blob.path,},)forpageinresponse.document.pages)
[docs]defbatch_parse(self,blobs:Sequence[Blob],gcs_output_path:Optional[str]=None,timeout_sec:int=3600,check_in_interval_sec:int=60,**process_options_kwargs:Any,)->Iterator[Document]:"""Parses a list of blobs lazily. Args: blobs: a list of blobs to parse. gcs_output_path: a path on Google Cloud Storage to store parsing results. timeout_sec: a timeout to wait for Document AI to complete, in seconds. check_in_interval_sec: an interval to wait until next check whether parsing operations have been completed, in seconds. process_options_kwargs: optional parameters to pass to the Document AI processors This is a long-running operation. A recommended way is to decouple parsing from creating LangChain Documents: >>> operations = parser.docai_parse(blobs, gcs_path) >>> parser.is_running(operations) You can get operations names and save them: >>> names = [op.operation.name for op in operations] And when all operations are finished, you can use their results: >>> operations = parser.operations_from_names(operation_names) >>> results = parser.get_results(operations) >>> docs = parser.parse_from_results(results) """output_path=gcs_output_pathorself._gcs_output_pathifnotoutput_path:raiseValueError("An output path on Google Cloud Storage should be provided.")operations=self.docai_parse(blobs,gcs_output_path=output_path,**process_options_kwargs)operation_names=[op.operation.nameforopinoperations]logger.debug("Started parsing with Document AI, submitted operations %s",operation_names)time_elapsed=0whileself.is_running(operations):time.sleep(check_in_interval_sec)time_elapsed+=check_in_interval_seciftime_elapsed>timeout_sec:raiseTimeoutError("Timeout exceeded! Check operations "f"{operation_names} later!")logger.debug(".")results=self.get_results(operations=operations)yield fromself.parse_from_results(results)
[docs]defparse_from_results(self,results:List[DocAIParsingResults])->Iterator[Document]:try:fromgoogle.cloud.documentai_toolbox.utilities.gcs_utilitiesimport(# type: ignore[import]split_gcs_uri,)fromgoogle.cloud.documentai_toolbox.wrappers.documentimport(# type: ignore[import]_get_shards,)fromgoogle.cloud.documentai_toolbox.wrappers.pageimport_text_from_layoutexceptImportErrorasexc:raiseImportError("documentai_toolbox package not found, please install it with ""`pip install langchain-google-community[docai]`")fromexcforresultinresults:gcs_bucket_name,gcs_prefix=split_gcs_uri(result.parsed_path)shards=_get_shards(gcs_bucket_name,gcs_prefix+"/")ifself._use_layout_parser:yield from(Document(page_content=chunk.content,metadata={"chunk_id":chunk.chunk_id,"source":result.source_path,},)forshardinshardsforchunkinshard.chunked_document.chunks)else:yield from(Document(page_content=_text_from_layout(page.layout,shard.text),metadata={"page":page.page_number,"source":result.source_path,},)forshardinshardsforpageinshard.pages)
[docs]defoperations_from_names(self,operation_names:List[str])->List["Operation"]:"""Initializes Long-Running Operations from their names."""try:fromgoogle.longrunning.operations_pb2import(# type: ignore[import]GetOperationRequest,)exceptImportErrorasexc:raiseImportError("long running operations package not found, please install it with""`pip install langchain-google-community[docai]`")fromexcreturn[self._client.get_operation(request=GetOperationRequest(name=name))fornameinoperation_names]
[docs]defdocai_parse(self,blobs:Sequence[Blob],*,gcs_output_path:Optional[str]=None,processor_name:Optional[str]=None,batch_size:int=1000,field_mask:Optional[str]=None,**process_options_kwargs:Any,)->List["Operation"]:"""Runs Google Document AI PDF Batch Processing on a list of blobs. Args: blobs: a list of blobs to be parsed gcs_output_path: a path (folder) on GCS to store results processor_name: name of a Document AI processor. batch_size: amount of documents per batch field_mask: a comma-separated list of which fields to include in the Document AI response. suggested: "text,pages.pageNumber,pages.layout" process_options_kwargs: optional parameters to pass to the Document AI processors Document AI has a 1000 file limit per batch, so batches larger than that need to be split into multiple requests. Batch processing is an async long-running operation and results are stored in a output GCS bucket. """try:fromgoogle.cloudimportdocumentaiexceptImportErrorasexc:raiseImportError("documentai package not found, please install it with ""`pip install langchain-google-community[docai]`")fromexcoutput_path=gcs_output_pathorself._gcs_output_pathifoutput_pathisNone:raiseValueError("An output path on Google Cloud Storage should be provided.")processor_name=processor_nameorself._processor_nameifprocessor_nameisNone:raiseValueError("A Document AI processor name should be provided.")operations=[]forbatchinbatch_iterate(size=batch_size,iterable=blobs):input_config=documentai.BatchDocumentsInputConfig(gcs_documents=documentai.GcsDocuments(documents=[documentai.GcsDocument(gcs_uri=blob.path,mime_type=blob.mimetypeor"application/pdf",)forblobinbatch]))output_config=documentai.DocumentOutputConfig(gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig(gcs_uri=output_path,field_mask=field_mask))process_options=self._prepare_process_options(**process_options_kwargs)operations.append(self._client.batch_process_documents(documentai.BatchProcessRequest(name=processor_name,input_documents=input_config,document_output_config=output_config,process_options=process_options,skip_human_review=True,)))returnoperations
[docs]defget_results(self,operations:List["Operation"])->List[DocAIParsingResults]:try:fromgoogle.cloud.documentai_v1import(# type: ignore[import]BatchProcessMetadata,)exceptImportErrorasexc:raiseImportError("documentai package not found, please install it with ""`pip install langchain-google-community[docai]`")fromexcreturn[DocAIParsingResults(source_path=status.input_gcs_source,parsed_path=status.output_gcs_destination,)foropinoperationsforstatusin(op.metadata.individual_process_statusesifisinstance(op.metadata,BatchProcessMetadata)elseBatchProcessMetadata.deserialize(op.metadata.value).individual_process_statuses)]