drivendataorg / cloudpathlib

Python pathlib-style classes for cloud storage services such as Amazon S3, Azure Blob Storage, and Google Cloud Storage.
https://cloudpathlib.drivendata.org
MIT License
478 stars 62 forks source link

Add download/upload progress callback support #488

Open MHendricks opened 4 days ago

MHendricks commented 4 days ago

I've started using cloudpathlib recently and its made what I'm working on so much easier. I would like to report download progress to the users if any of my cloud files need downloaded.

I'm using S3 and got it working for my needs in my personal PR https://github.com/blurstudio-forks/cloudpathlib/pull/1. It's fairly easy to configure by passing a callable to the new transfer_callable argument of S3Client. The callable must accept the arguments direction, state, cloud_path, bytes_sent). This uses the S3Client to enable callbacks for any CloudPath used if it happens to trigger download/uploads.

The first 3 arguments allow your callable to understand what is happening and update your UI as needed including keeping track of multiple transfers happening at the same time. It lets you know when a transfer starts and stops so you can show/hide UI.

A simple implementation example:

from itertools import islice
from cloudpathlib import CloudPath, S3Client

def progress_callback(direction, state, cloud_path, bytes_sent):
    print("Progress", direction, state, cloud_path, bytes_sent)

client = S3Client(transfer_callable=progress_callback, no_sign_request=True)
ladi = CloudPath("s3://ladi/Images/FEMA_CAP/2020/70349", client=client)
a_file = next(islice(ladi.iterdir(), 1))
# If file needs downloaded, this will call `progress_callback` to report start, stop and download progress
print(a_file.fspath)

I have only worked with S3, but I hope the other services also provide a transfer progress reporting system. Is this something you would like to implement or should I develop my code enough to create a pull request?

A more complex example using the rich library to show download progress of several files downloaded at once. ```py import concurrent.futures from functools import partial from itertools import islice from cloudpathlib import CloudPath, S3Client from rich.progress import BarColumn, DownloadColumn, Progress, TransferSpeedColumn, TextColumn callback_state: dict[CloudPath, dict] = {} def progress_callback(progress, direction, state, cloud_path, bytes_sent): if state == "start": progress.start() size = cloud_path.stat().st_size task_id = progress.add_task("Downloading", total=size, filename=cloud_path.name) callback_state[cloud_path] = dict(task_id=task_id, size=size) elif state == "stop": if cloud_path in callback_state: del callback_state[cloud_path] else: info = callback_state[cloud_path] progress.update(info["task_id"], advance=bytes_sent) def download_url(cloud_path): # Trigger download of non-cached files cloud_path.fspath def download(urls) -> None: with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {} for url in urls: future_to_url[executor.submit(download_url, url)] = url for future in concurrent.futures.as_completed(future_to_url): try: future.result() except Exception as exc: print('%r generated an exception: %s' % (future_to_url[future], exc)) progress.stop() if __name__ == "__main__": # Set up the progress bar and attach the callback to the S3Client progress = Progress( TextColumn("[blue]{task.fields[filename]}"), BarColumn(), DownloadColumn(), TransferSpeedColumn(), ) client = S3Client( transfer_callable=partial(progress_callback, progress), no_sign_request=True, ) # Download the first first 5 files from this resource. ladi = CloudPath("s3://ladi/Images/FEMA_CAP/2020/70349", client=client) download(islice(ladi.iterdir(), 5)) ```