Source code for langchain_community.document_loaders.blob_loaders.cloud_blob_loader
"""Use to load blobs from the local file system."""importcontextlibimportmimetypesimporttempfilefromioimportBufferedReader,BytesIOfrompathlibimportPathfromtypingimport(TYPE_CHECKING,Callable,Generator,Iterable,Iterator,Optional,Sequence,TypeVar,Union,)fromurllib.parseimporturlparseifTYPE_CHECKING:fromcloudpathlibimportAnyPathfromlangchain_community.document_loaders.blob_loaders.schemaimport(Blob,BlobLoader,)T=TypeVar("T")class_CloudBlob(Blob):defas_string(self)->str:"""Read data as a string."""fromcloudpathlibimportAnyPathifself.dataisNoneandself.path:returnAnyPath(self.path).read_text(encoding=self.encoding)# type: ignoreelifisinstance(self.data,bytes):returnself.data.decode(self.encoding)elifisinstance(self.data,str):returnself.dataelse:raiseValueError(f"Unable to get string for blob {self}")defas_bytes(self)->bytes:"""Read data as bytes."""fromcloudpathlibimportAnyPathifisinstance(self.data,bytes):returnself.dataelifisinstance(self.data,str):returnself.data.encode(self.encoding)elifself.dataisNoneandself.path:returnAnyPath(self.path).read_bytes()# type: ignoreelse:raiseValueError(f"Unable to get bytes for blob {self}")@contextlib.contextmanagerdefas_bytes_io(self)->Generator[Union[BytesIO,BufferedReader],None,None]:"""Read data as a byte stream."""fromcloudpathlibimportAnyPathifisinstance(self.data,bytes):yieldBytesIO(self.data)elifself.dataisNoneandself.path:returnAnyPath(self.path).read_bytes()# type: ignoreelse:raiseNotImplementedError(f"Unable to convert blob {self}")def_url_to_filename(url:str)->str:""" Convert file:, s3:, az: or gs: url to localfile. If the file is not here, download it in a temporary file. """fromcloudpathlibimportAnyPathurl_parsed=urlparse(url)suffix=Path(url_parsed.path).suffixifurl_parsed.schemein["s3","az","gs"]:withAnyPath(url).open("rb")asf:# type: ignoretemp_file=tempfile.NamedTemporaryFile(suffix=suffix,delete=False)whileTrue:buf=f.read()ifnotbuf:breaktemp_file.write(buf)temp_file.close()file_path=temp_file.nameelifurl_parsed.schemein["file",""]:file_path=url_parsed.pathelse:raiseValueError(f"Scheme {url_parsed.scheme} not supported")returnfile_pathdef_make_iterator(length_func:Callable[[],int],show_progress:bool=False)->Callable[[Iterable[T]],Iterator[T]]:"""Create a function that optionally wraps an iterable in tqdm."""ifshow_progress:try:fromtqdm.autoimporttqdmexceptImportError:raiseImportError("You must install tqdm to use show_progress=True.""You can install tqdm with `pip install tqdm`.")# Make sure to provide `total` here so that tqdm can show# a progress bar that takes into account the total number of files.def_with_tqdm(iterable:Iterable[T])->Iterator[T]:"""Wrap an iterable in a tqdm progress bar."""returntqdm(iterable,total=length_func())iterator=_with_tqdmelse:iterator=iter# type: ignorereturniterator# PUBLIC API
[docs]classCloudBlobLoader(BlobLoader):"""Load blobs from cloud URL or file:. Example: .. code-block:: python loader = CloudBlobLoader("s3://mybucket/id") for blob in loader.yield_blobs(): print(blob) """# noqa: E501
[docs]def__init__(self,url:Union[str,"AnyPath"],*,glob:str="**/[!.]*",exclude:Sequence[str]=(),suffixes:Optional[Sequence[str]]=None,show_progress:bool=False,)->None:"""Initialize with a url and how to glob over it. Use [CloudPathLib](https://cloudpathlib.drivendata.org/). Args: url: Cloud URL to load from. Supports s3://, az://, gs://, file:// schemes. If no scheme is provided, it is assumed to be a local file. If a path to a file is provided, glob/exclude/suffixes are ignored. glob: Glob pattern relative to the specified path by default set to pick up all non-hidden files exclude: patterns to exclude from results, use glob syntax suffixes: Provide to keep only files with these suffixes Useful when wanting to keep files with different suffixes Suffixes must include the dot, e.g. ".txt" show_progress: If true, will show a progress bar as the files are loaded. This forces an iteration through all matching files to count them prior to loading them. Examples: .. code-block:: python from langchain_community.document_loaders.blob_loaders import CloudBlobLoader # Load a single file. loader = CloudBlobLoader("s3://mybucket/id") # az:// # Recursively load all text files in a directory. loader = CloudBlobLoader("az://mybucket/id", glob="**/*.txt") # Recursively load all non-hidden files in a directory. loader = CloudBlobLoader("gs://mybucket/id", glob="**/[!.]*") # Load all files in a directory without recursion. loader = CloudBlobLoader("s3://mybucket/id", glob="*") # Recursively load all files in a directory, except for py or pyc files. loader = CloudBlobLoader( "s3://mybucket/id", glob="**/*.txt", exclude=["**/*.py", "**/*.pyc"] ) """# noqa: E501fromcloudpathlibimportAnyPathurl_parsed=urlparse(str(url))ifurl_parsed.scheme=="file":url=url_parsed.pathifisinstance(url,str):self.path=AnyPath(url)else:self.path=urlself.glob=globself.suffixes=set(suffixesor[])self.show_progress=show_progressself.exclude=exclude
[docs]defyield_blobs(self,)->Iterable[Blob]:"""Yield blobs that match the requested pattern."""iterator=_make_iterator(length_func=self.count_matching_files,show_progress=self.show_progress)forpathiniterator(self._yield_paths()):# yield Blob.from_path(path)yieldself.from_path(path)
def_yield_paths(self)->Iterable["AnyPath"]:"""Yield paths that match the requested pattern."""ifself.path.is_file():# type: ignoreyieldself.pathreturnpaths=self.path.glob(self.glob)# type: ignore[attr-defined]forpathinpaths:ifself.exclude:ifany(path.match(glob)forglobinself.exclude):continueifpath.is_file():ifself.suffixesandpath.suffixnotinself.suffixes:continue# FIXMEyieldpath
[docs]defcount_matching_files(self)->int:"""Count files that match the pattern without loading them."""# Carry out a full iteration to count the files without# materializing anything expensive in memory.num=0for_inself._yield_paths():num+=1returnnum
[docs]@classmethoddeffrom_path(cls,path:"AnyPath",*,encoding:str="utf-8",mime_type:Optional[str]=None,guess_type:bool=True,metadata:Optional[dict]=None,)->Blob:"""Load the blob from a path like object. Args: path: path like object to file to be read Supports s3://, az://, gs://, file:// schemes. If no scheme is provided, it is assumed to be a local file. encoding: Encoding to use if decoding the bytes into a string mime_type: if provided, will be set as the mime-type of the data guess_type: If True, the mimetype will be guessed from the file extension, if a mime-type was not provided metadata: Metadata to associate with the blob Returns: Blob instance """ifmime_typeisNoneandguess_type:_mimetype=mimetypes.guess_type(path)[0]ifguess_typeelseNone# type: ignoreelse:_mimetype=mime_typeurl_parsed=urlparse(str(path))ifurl_parsed.schemein["file",""]:ifurl_parsed.scheme=="file":local_path=url_parsed.pathelse:local_path=str(path)returnBlob(data=None,mimetype=_mimetype,encoding=encoding,path=local_path,metadata=metadataifmetadataisnotNoneelse{},)return_CloudBlob(data=None,mimetype=_mimetype,encoding=encoding,path=str(path),metadata=metadataifmetadataisnotNoneelse{},)