boto / boto3

AWS SDK for Python
https://aws.amazon.com/sdk-for-python/
Apache License 2.0
8.81k stars 1.84k forks source link

on_done not called when token has expired #4145

Open daveisfera opened 1 month ago

daveisfera commented 1 month ago

Describe the bug

If the token has expired, then the file isn't uploaded (expected), but on_done is not called and there's no way to get notification of the failure

(refiling https://github.com/boto/s3transfer/issues/304 here since I use s3transfer through boto3)

Expected Behavior

An error would be reported

Current Behavior

The upload silently fails

Reproduction Steps

  1. Create a token using aws-vault
  2. Do an upload
  3. Verify success
  4. Wait for token to expire
  5. Do an upload
  6. Notice that no error is reported and on_done is never called

Possible Solution

Report an error using on_done in the same way from before the expiration

Additional Information/Context

No response

SDK version used

1.34.97

Environment details (OS name and version, etc.)

Debian Bookworm with Python 3.12.3

tim-finnigan commented 3 weeks ago

Thanks for reaching out. Unfortunately we cannot guarantee compatibility with third-party libraries like aws-vault. To investigate a boto3 issue here we would need a code snippet to reproduce the issue, in addition to debug logs (with sensitive info redacted) which you can get by adding boto3.set_stream_logger('') to your script.

daveisfera commented 3 weeks ago

aws-vault is just a convenient way to generate the necessary ENVs to authorizing with boto3 and the same problem can be reproduced with any token that expires. I'll grab logs of it happening.

daveisfera commented 3 weeks ago

Here's a minimal reproducer (and I believe the simplest example of using BaseSubscriber and on_done that's possible) and I've attached the logs from running it with an expired token:

import logging
from io import BytesIO

from boto3 import set_stream_logger
from boto3.s3.transfer import BaseSubscriber
from boto3.session import Session
from s3transfer.manager import TransferConfig, TransferManager

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

set_stream_logger("")

class CheckDone(BaseSubscriber):
    def on_done(self, future, **kwargs) -> None:
        try:
            res = future.result()
            logger.info("Upload worked: %s", res)
        except Exception as e:
            logger.error("Upload failed: %s", e)

def _main() -> None:
    session = Session()
    s3_tm = TransferManager(session.client("s3"), TransferConfig(max_request_concurrency=3))
    s3_tm.upload(
        BytesIO(b"testing"),
        "test_bucket",
        "s3transfer_test.txt",
        subscribers=[CheckDone("s3transfer_test.txt")],
    )
    s3_tm.shutdown()
    logger.info("Done")

if __name__ == "__main__":
    _main()

s3transfer_test.log

tim-finnigan commented 1 week ago

Thanks for following up. We don't recommend using s3transfer directly, as noted in the README:

This project is not currently GA. If you are planning to use this code in production, make sure to lock to a minor version as interfaces may break from minor version to minor version. For a basic, stable interface of s3transfer, try the interfaces exposed in boto3

Have you tried using the Boto3 upload methods for S3? You can handle any errors as documented here or handle events like after-call-error.

daveisfera commented 1 week ago

Sorry, I pulled from the wrong import when putting together that minimal reproducer. Here's it with the import from boto3:

import logging
from io import BytesIO

from boto3 import set_stream_logger
from boto3.s3.transfer import BaseSubscriber, TransferConfig, TransferManager
from boto3.session import Session

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

set_stream_logger("")

class CheckDone(BaseSubscriber):
    def on_done(self, future, **kwargs) -> None:
        try:
            res = future.result()
            logger.info("Upload worked: %s", res)
        except Exception as e:
            logger.error("Upload failed: %s", e)

def _main() -> None:
    session = Session()
    s3_tm = TransferManager(session.client("s3"), TransferConfig(max_concurrency=3))
    s3_tm.upload(
        BytesIO(b"testing"),
        "test_bucket",
        "s3transfer_test.txt",
        subscribers=[CheckDone("s3transfer_test.txt")],
    )
    s3_tm.shutdown()
    logger.info("Done")

if __name__ == "__main__":
    _main()
tim-finnigan commented 4 days ago

That snippet is still using the TransferManager directly which isn't recommended:

Thanks for following up. We don't recommend using s3transfer directly, as noted in the README:

This project is not currently GA. If you are planning to use this code in production, make sure to lock to a minor version as interfaces may break from minor version to minor version. For a basic, stable interface of s3transfer, try the interfaces exposed in boto3

Have you tried using the Boto3 upload methods for S3? You can handle any errors as documented here or handle events like after-call-error.

daveisfera commented 4 days ago

Then what is the recommended way to upload multiple files in parallel in a separate thread using boto3?

This is the only information I could find about uploading in parallel and it looks like I'm doing a simpler version of what that code shows, so is there a better way?

tim-finnigan commented 3 days ago

Hi @daveisfera — we recommend using upload_file or upload_fileobj. And here is the documentation on multithreading/multiprocessing. Although the S3Transfer code you shared is public it's still recommended to use Boto3 directly.

Also looking back at the logs you shared, it looks like you're on_done function is being called and the exception is logged (ERROR:__main__:Upload failed:...).

daveisfera commented 2 days ago

Hi @daveisfera — we recommend using upload_file or upload_fileobj. And here is the documentation on multithreading/multiprocessing. Although the S3Transfer code you shared is public it's still recommended to use Boto3 directly.

I'm glad to switch but that documentation doesn't seem to provide information on how to actually perform the uploads in another thread. It also mentions events but I don't see how to receive an event notification like on_done, so is there detailed information on how to use it?

Also looking back at the logs you shared, it looks like you're on_done function is being called and the exception is logged (ERROR:__main__:Upload failed:...).

You are correct. I'll check and see if I can get logs from when the callback doesn't happen