Source code for langchain_google_community.docai

"""Module contains a PDF parser based on Document AI from Google Cloud.

You need to install two libraries to use this parser:
pip install google-cloud-documentai
pip install google-cloud-documentai-toolbox
"""

import logging
import re
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Iterator, List, Optional, Sequence

from langchain_core.document_loaders import BaseBlobParser
from langchain_core.document_loaders.blob_loaders import Blob
from langchain_core.documents import Document
from langchain_core.utils.iter import batch_iterate

from langchain_google_community._utils import get_client_info

if TYPE_CHECKING:
    from google.api_core.operation import Operation  # type: ignore[import]
    from google.cloud.documentai import (  # type: ignore[import]
        DocumentProcessorServiceClient,
    )
    from google.cloud.documentai_v1.types import ProcessOptions

logger = logging.getLogger(__name__)


[docs] @dataclass class DocAIParsingResults: """A dataclass to store Document AI parsing results.""" source_path: str parsed_path: str
[docs] class DocAIParser(BaseBlobParser): """`Google Cloud Document AI` parser. For a detailed explanation of Document AI, refer to the product documentation. https://cloud.google.com/document-ai/docs/overview """
[docs] def __init__( self, *, client: Optional["DocumentProcessorServiceClient"] = None, project_id: Optional[str] = None, location: Optional[str] = None, gcs_output_path: Optional[str] = None, processor_name: Optional[str] = None, ): """Initializes the parser. Args: client: a DocumentProcessorServiceClient to use location: a Google Cloud location where a Document AI processor is located gcs_output_path: a path on Google Cloud Storage to store parsing results processor_name: full resource name of a Document AI processor or processor version You should provide either a client or location (and then a client would be instantiated). """ if bool(client) == bool(location): raise ValueError( "You must specify either a client or a location to instantiate " "a client." ) pattern = r"projects\/[0-9]+\/locations\/[a-z\-0-9]+\/processors\/[a-z0-9]+" if processor_name and not re.fullmatch(pattern, processor_name): raise ValueError( f"Processor name {processor_name} has the wrong format. If your " "prediction endpoint looks like https://us-documentai.googleapis.com" "/v1/projects/PROJECT_ID/locations/us/processors/PROCESSOR_ID:process," " use only projects/PROJECT_ID/locations/us/processors/PROCESSOR_ID " "part." ) self._gcs_output_path = gcs_output_path self._processor_name = processor_name if client: self._client = client else: try: from google.api_core.client_options import ClientOptions from google.cloud.documentai import DocumentProcessorServiceClient except ImportError as exc: raise ImportError( "Could not import google-cloud-documentai python package. " "Please, install docai dependency group: " "`pip install langchain-google-community[docai]`" ) from exc options = ClientOptions( quota_project_id=project_id, api_endpoint=f"{location}-documentai.googleapis.com", ) self._client = DocumentProcessorServiceClient( client_options=options, client_info=get_client_info(module="document-ai"), ) # get processor type self._processor_type = self._client.get_processor(name=processor_name).type if self._processor_type == "LAYOUT_PARSER_PROCESSOR": self._use_layout_parser = True else: self._use_layout_parser = False
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]: """Parses a blob lazily. Args: blobs: a Blob to parse This is a long-running operation. A recommended way is to batch documents together and use the `batch_parse()` method. """ yield from self.batch_parse([blob], gcs_output_path=self._gcs_output_path)
def _prepare_process_options( self, enable_native_pdf_parsing: Optional[bool] = True, page_range: Optional[List[int]] = None, chunk_size: Optional[int] = 500, include_ancestor_headings: Optional[bool] = True, ) -> "ProcessOptions": """Prepare process options for DocAI process request Args: enable_native_pdf_parsing: enable pdf embedded text extraction page_range: list of page numbers to parse. If `None`, entire document will be parsed. chunk_size: maximum number of characters per chunk (supported only with Document AI Layout Parser processor). include_ancestor_headings: whether or not to include ancestor headings when splitting (supported only with Document AI Layout Parser processor). """ try: from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions except ImportError as exc: raise ImportError( "documentai package not found, please install it with " "`pip install langchain-google-community[docai]`" ) from exc if self._use_layout_parser: layout_config = ProcessOptions.LayoutConfig( chunking_config=ProcessOptions.LayoutConfig.ChunkingConfig( chunk_size=chunk_size, include_ancestor_headings=include_ancestor_headings, ) ) individual_page_selector = ( ProcessOptions.IndividualPageSelector(pages=page_range) if page_range else None ) process_options = ProcessOptions( layout_config=layout_config, individual_page_selector=individual_page_selector, ) else: ocr_config = ( OcrConfig(enable_native_pdf_parsing=enable_native_pdf_parsing) if enable_native_pdf_parsing else None ) individual_page_selector = ( ProcessOptions.IndividualPageSelector(pages=page_range) if page_range else None ) process_options = ProcessOptions( ocr_config=ocr_config, individual_page_selector=individual_page_selector ) return process_options
[docs] def online_process( self, blob: Blob, field_mask: Optional[str] = None, **process_options_kwargs: Any, ) -> Iterator[Document]: """Parses a blob lazily using online processing. Args: blob: a blob to parse. field_mask: a comma-separated list of which fields to include in the Document AI response. suggested: "text,pages.pageNumber,pages.layout" process_options_kwargs: optional parameters to pass to the Document AI processors """ try: from google.cloud import documentai except ImportError as exc: raise ImportError( "Could not import google-cloud-documentai python package. " "Please, install docai dependency group: " "`pip install langchain-google-community[docai]`" ) from exc try: from google.cloud.documentai_toolbox.wrappers.page import ( # type: ignore[import] _text_from_layout, ) except ImportError as exc: raise ImportError( "documentai_toolbox package not found, please install it with " "`pip install langchain-google-community[docai]`" ) from exc # prepare process options process_options = self._prepare_process_options(**process_options_kwargs) response = self._client.process_document( documentai.ProcessRequest( name=self._processor_name, gcs_document=documentai.GcsDocument( gcs_uri=blob.path, mime_type=blob.mimetype or "application/pdf", ), process_options=process_options, skip_human_review=True, field_mask=field_mask, ) ) if self._use_layout_parser: yield from ( Document( page_content=chunk.content, metadata={ "chunk_id": chunk.chunk_id, "source": blob.path, }, ) for chunk in response.document.chunked_document.chunks ) else: yield from ( Document( page_content=_text_from_layout(page.layout, response.document.text), metadata={ "page": page.page_number, "source": blob.path, }, ) for page in response.document.pages )
[docs] def batch_parse( self, blobs: Sequence[Blob], gcs_output_path: Optional[str] = None, timeout_sec: int = 3600, check_in_interval_sec: int = 60, **process_options_kwargs: Any, ) -> Iterator[Document]: """Parses a list of blobs lazily. Args: blobs: a list of blobs to parse. gcs_output_path: a path on Google Cloud Storage to store parsing results. timeout_sec: a timeout to wait for Document AI to complete, in seconds. check_in_interval_sec: an interval to wait until next check whether parsing operations have been completed, in seconds. process_options_kwargs: optional parameters to pass to the Document AI processors This is a long-running operation. A recommended way is to decouple parsing from creating LangChain Documents: >>> operations = parser.docai_parse(blobs, gcs_path) >>> parser.is_running(operations) You can get operations names and save them: >>> names = [op.operation.name for op in operations] And when all operations are finished, you can use their results: >>> operations = parser.operations_from_names(operation_names) >>> results = parser.get_results(operations) >>> docs = parser.parse_from_results(results) """ output_path = gcs_output_path or self._gcs_output_path if not output_path: raise ValueError( "An output path on Google Cloud Storage should be provided." ) operations = self.docai_parse( blobs, gcs_output_path=output_path, **process_options_kwargs ) operation_names = [op.operation.name for op in operations] logger.debug( "Started parsing with Document AI, submitted operations %s", operation_names ) time_elapsed = 0 while self.is_running(operations): time.sleep(check_in_interval_sec) time_elapsed += check_in_interval_sec if time_elapsed > timeout_sec: raise TimeoutError( "Timeout exceeded! Check operations " f"{operation_names} later!" ) logger.debug(".") results = self.get_results(operations=operations) yield from self.parse_from_results(results)
[docs] def parse_from_results( self, results: List[DocAIParsingResults] ) -> Iterator[Document]: try: from google.cloud.documentai_toolbox.utilities.gcs_utilities import ( # type: ignore[import] split_gcs_uri, ) from google.cloud.documentai_toolbox.wrappers.document import ( # type: ignore[import] _get_shards, ) from google.cloud.documentai_toolbox.wrappers.page import _text_from_layout except ImportError as exc: raise ImportError( "documentai_toolbox package not found, please install it with " "`pip install langchain-google-community[docai]`" ) from exc for result in results: gcs_bucket_name, gcs_prefix = split_gcs_uri(result.parsed_path) shards = _get_shards(gcs_bucket_name, gcs_prefix + "/") if self._use_layout_parser: yield from ( Document( page_content=chunk.content, metadata={ "chunk_id": chunk.chunk_id, "source": result.source_path, }, ) for shard in shards for chunk in shard.chunked_document.chunks ) else: yield from ( Document( page_content=_text_from_layout(page.layout, shard.text), metadata={ "page": page.page_number, "source": result.source_path, }, ) for shard in shards for page in shard.pages )
[docs] def operations_from_names(self, operation_names: List[str]) -> List["Operation"]: """Initializes Long-Running Operations from their names.""" try: from google.longrunning.operations_pb2 import ( # type: ignore[import] GetOperationRequest, ) except ImportError as exc: raise ImportError( "long running operations package not found, please install it with" "`pip install langchain-google-community[docai]`" ) from exc return [ self._client.get_operation(request=GetOperationRequest(name=name)) for name in operation_names ]
[docs] def is_running(self, operations: List["Operation"]) -> bool: return any(not op.done() for op in operations)
[docs] def docai_parse( self, blobs: Sequence[Blob], *, gcs_output_path: Optional[str] = None, processor_name: Optional[str] = None, batch_size: int = 1000, field_mask: Optional[str] = None, **process_options_kwargs: Any, ) -> List["Operation"]: """Runs Google Document AI PDF Batch Processing on a list of blobs. Args: blobs: a list of blobs to be parsed gcs_output_path: a path (folder) on GCS to store results processor_name: name of a Document AI processor. batch_size: amount of documents per batch field_mask: a comma-separated list of which fields to include in the Document AI response. suggested: "text,pages.pageNumber,pages.layout" process_options_kwargs: optional parameters to pass to the Document AI processors Document AI has a 1000 file limit per batch, so batches larger than that need to be split into multiple requests. Batch processing is an async long-running operation and results are stored in a output GCS bucket. """ try: from google.cloud import documentai except ImportError as exc: raise ImportError( "documentai package not found, please install it with " "`pip install langchain-google-community[docai]`" ) from exc output_path = gcs_output_path or self._gcs_output_path if output_path is None: raise ValueError( "An output path on Google Cloud Storage should be provided." ) processor_name = processor_name or self._processor_name if processor_name is None: raise ValueError("A Document AI processor name should be provided.") operations = [] for batch in batch_iterate(size=batch_size, iterable=blobs): input_config = documentai.BatchDocumentsInputConfig( gcs_documents=documentai.GcsDocuments( documents=[ documentai.GcsDocument( gcs_uri=blob.path, mime_type=blob.mimetype or "application/pdf", ) for blob in batch ] ) ) output_config = documentai.DocumentOutputConfig( gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig( gcs_uri=output_path, field_mask=field_mask ) ) process_options = self._prepare_process_options(**process_options_kwargs) operations.append( self._client.batch_process_documents( documentai.BatchProcessRequest( name=processor_name, input_documents=input_config, document_output_config=output_config, process_options=process_options, skip_human_review=True, ) ) ) return operations
[docs] def get_results(self, operations: List["Operation"]) -> List[DocAIParsingResults]: try: from google.cloud.documentai_v1 import ( # type: ignore[import] BatchProcessMetadata, ) except ImportError as exc: raise ImportError( "documentai package not found, please install it with " "`pip install langchain-google-community[docai]`" ) from exc return [ DocAIParsingResults( source_path=status.input_gcs_source, parsed_path=status.output_gcs_destination, ) for op in operations for status in ( op.metadata.individual_process_statuses if isinstance(op.metadata, BatchProcessMetadata) else BatchProcessMetadata.deserialize( op.metadata.value ).individual_process_statuses ) ]