[docs]classAzureBlobStorageFileLoader(BaseLoader):"""Load from `Azure Blob Storage` files."""
[docs]def__init__(self,conn_str:str,container:str,blob_name:str):"""Initialize with connection string, container and blob name."""self.conn_str=conn_str"""Connection string for Azure Blob Storage."""self.container=container"""Container name."""self.blob=blob_name"""Blob name."""
[docs]defload(self)->List[Document]:"""Load documents."""try:fromazure.storage.blobimportBlobClientexceptImportErrorasexc:raiseImportError("Could not import azure storage blob python package. ""Please install it with `pip install azure-storage-blob`.")fromexcclient=BlobClient.from_connection_string(conn_str=self.conn_str,container_name=self.container,blob_name=self.blob)withtempfile.TemporaryDirectory()astemp_dir:file_path=f"{temp_dir}/{self.container}/{self.blob}"os.makedirs(os.path.dirname(file_path),exist_ok=True)withopen(f"{file_path}","wb")asfile:blob_data=client.download_blob()blob_data.readinto(file)loader=UnstructuredFileLoader(file_path)returnloader.load()