Source code for langchain_community.document_loaders.blob_loaders.cloud_blob_loader

"""Use to load blobs from the local file system."""

import contextlib
import mimetypes
import tempfile
from io import BufferedReader, BytesIO
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Callable,
    Generator,
    Iterable,
    Iterator,
    Optional,
    Sequence,
    TypeVar,
    Union,
)
from urllib.parse import urlparse

if TYPE_CHECKING:
    from cloudpathlib import AnyPath

from langchain_community.document_loaders.blob_loaders.schema import (
    Blob,
    BlobLoader,
)

T = TypeVar("T")


class _CloudBlob(Blob):
    def as_string(self) -> str:
        """Read data as a string."""
        from cloudpathlib import AnyPath

        if self.data is None and self.path:
            return AnyPath(self.path).read_text(encoding=self.encoding)  # type: ignore
        elif isinstance(self.data, bytes):
            return self.data.decode(self.encoding)
        elif isinstance(self.data, str):
            return self.data
        else:
            raise ValueError(f"Unable to get string for blob {self}")

    def as_bytes(self) -> bytes:
        """Read data as bytes."""
        from cloudpathlib import AnyPath

        if isinstance(self.data, bytes):
            return self.data
        elif isinstance(self.data, str):
            return self.data.encode(self.encoding)
        elif self.data is None and self.path:
            return AnyPath(self.path).read_bytes()  # type: ignore
        else:
            raise ValueError(f"Unable to get bytes for blob {self}")

    @contextlib.contextmanager
    def as_bytes_io(self) -> Generator[Union[BytesIO, BufferedReader], None, None]:
        """Read data as a byte stream."""
        from cloudpathlib import AnyPath

        if isinstance(self.data, bytes):
            yield BytesIO(self.data)
        elif self.data is None and self.path:
            return AnyPath(self.path).read_bytes()  # type: ignore
        else:
            raise NotImplementedError(f"Unable to convert blob {self}")


def _url_to_filename(url: str) -> str:
    """
    Convert file:, s3:, az: or gs: url to localfile.
    If the file is not here, download it in a temporary file.
    """
    from cloudpathlib import AnyPath

    url_parsed = urlparse(url)
    suffix = Path(url_parsed.path).suffix
    if url_parsed.scheme in ["s3", "az", "gs"]:
        with AnyPath(url).open("rb") as f:  # type: ignore
            temp_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
            while True:
                buf = f.read()
                if not buf:
                    break
                temp_file.write(buf)
            temp_file.close()
            file_path = temp_file.name
    elif url_parsed.scheme in ["file", ""]:
        file_path = url_parsed.path
    else:
        raise ValueError(f"Scheme {url_parsed.scheme} not supported")
    return file_path


def _make_iterator(
    length_func: Callable[[], int], show_progress: bool = False
) -> Callable[[Iterable[T]], Iterator[T]]:
    """Create a function that optionally wraps an iterable in tqdm."""
    if show_progress:
        try:
            from tqdm.auto import tqdm
        except ImportError:
            raise ImportError(
                "You must install tqdm to use show_progress=True."
                "You can install tqdm with `pip install tqdm`."
            )

        # Make sure to provide `total` here so that tqdm can show
        # a progress bar that takes into account the total number of files.
        def _with_tqdm(iterable: Iterable[T]) -> Iterator[T]:
            """Wrap an iterable in a tqdm progress bar."""
            return tqdm(iterable, total=length_func())

        iterator = _with_tqdm
    else:
        iterator = iter  # type: ignore

    return iterator


# PUBLIC API


[docs]class CloudBlobLoader(BlobLoader): """Load blobs from cloud URL or file:. Example: .. code-block:: python loader = CloudBlobLoader("s3://mybucket/id") for blob in loader.yield_blobs(): print(blob) """ # noqa: E501
[docs] def __init__( self, url: Union[str, "AnyPath"], *, glob: str = "**/[!.]*", exclude: Sequence[str] = (), suffixes: Optional[Sequence[str]] = None, show_progress: bool = False, ) -> None: """Initialize with a url and how to glob over it. Use [CloudPathLib](https://cloudpathlib.drivendata.org/). Args: url: Cloud URL to load from. Supports s3://, az://, gs://, file:// schemes. If no scheme is provided, it is assumed to be a local file. If a path to a file is provided, glob/exclude/suffixes are ignored. glob: Glob pattern relative to the specified path by default set to pick up all non-hidden files exclude: patterns to exclude from results, use glob syntax suffixes: Provide to keep only files with these suffixes Useful when wanting to keep files with different suffixes Suffixes must include the dot, e.g. ".txt" show_progress: If true, will show a progress bar as the files are loaded. This forces an iteration through all matching files to count them prior to loading them. Examples: .. code-block:: python from langchain_community.document_loaders.blob_loaders import CloudBlobLoader # Load a single file. loader = CloudBlobLoader("s3://mybucket/id") # az:// # Recursively load all text files in a directory. loader = CloudBlobLoader("az://mybucket/id", glob="**/*.txt") # Recursively load all non-hidden files in a directory. loader = CloudBlobLoader("gs://mybucket/id", glob="**/[!.]*") # Load all files in a directory without recursion. loader = CloudBlobLoader("s3://mybucket/id", glob="*") # Recursively load all files in a directory, except for py or pyc files. loader = CloudBlobLoader( "s3://mybucket/id", glob="**/*.txt", exclude=["**/*.py", "**/*.pyc"] ) """ # noqa: E501 from cloudpathlib import AnyPath url_parsed = urlparse(str(url)) if url_parsed.scheme == "file": url = url_parsed.path if isinstance(url, str): self.path = AnyPath(url) else: self.path = url self.glob = glob self.suffixes = set(suffixes or []) self.show_progress = show_progress self.exclude = exclude
[docs] def yield_blobs( self, ) -> Iterable[Blob]: """Yield blobs that match the requested pattern.""" iterator = _make_iterator( length_func=self.count_matching_files, show_progress=self.show_progress ) for path in iterator(self._yield_paths()): # yield Blob.from_path(path) yield self.from_path(path)
def _yield_paths(self) -> Iterable["AnyPath"]: """Yield paths that match the requested pattern.""" if self.path.is_file(): # type: ignore yield self.path return paths = self.path.glob(self.glob) for path in paths: if self.exclude: if any(path.match(glob) for glob in self.exclude): continue if path.is_file(): if self.suffixes and path.suffix not in self.suffixes: continue # FIXME yield path
[docs] def count_matching_files(self) -> int: """Count files that match the pattern without loading them.""" # Carry out a full iteration to count the files without # materializing anything expensive in memory. num = 0 for _ in self._yield_paths(): num += 1 return num
[docs] @classmethod def from_path( cls, path: "AnyPath", *, encoding: str = "utf-8", mime_type: Optional[str] = None, guess_type: bool = True, metadata: Optional[dict] = None, ) -> Blob: """Load the blob from a path like object. Args: path: path like object to file to be read Supports s3://, az://, gs://, file:// schemes. If no scheme is provided, it is assumed to be a local file. encoding: Encoding to use if decoding the bytes into a string mime_type: if provided, will be set as the mime-type of the data guess_type: If True, the mimetype will be guessed from the file extension, if a mime-type was not provided metadata: Metadata to associate with the blob Returns: Blob instance """ if mime_type is None and guess_type: _mimetype = mimetypes.guess_type(path)[0] if guess_type else None # type: ignore else: _mimetype = mime_type url_parsed = urlparse(str(path)) if url_parsed.scheme in ["file", ""]: if url_parsed.scheme == "file": local_path = url_parsed.path else: local_path = str(path) return Blob( data=None, mimetype=_mimetype, encoding=encoding, path=local_path, metadata=metadata if metadata is not None else {}, ) return _CloudBlob( data=None, mimetype=_mimetype, encoding=encoding, path=str(path), metadata=metadata if metadata is not None else {}, )