PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.95k stars 1.57k forks source link

download_folder_to_path failed when download big size folder #13027

Open hongbo-miao opened 11 months ago

hongbo-miao commented 11 months ago

Issue

I have a ~150 GB folder that I am trying to download from S3 to local. Here is my code:

from pathlib import Path, PurePosixPath

from prefect import task
from prefect_aws.s3 import S3Bucket

@task
async def download_dir_from_s3_to_local(
    s3_bucket: S3Bucket, s3_dir_path: PurePosixPath, local_raw_dir_path: Path
) -> None:
    await s3_bucket.download_folder_to_path(str(s3_dir_path), local_raw_dir_path)

After downloading the folder for about 1 min 500MB/s, the task will failed with error

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 699, in urlopen
    httplib_response = self._make_request(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 382, in _make_request
    self._validate_conn(conn)
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 1012, in _validate_conn
    conn.connect()
  File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 411, in connect
    self.sock = ssl_wrap_socket(
  File "/usr/lib/python3/dist-packages/urllib3/util/ssl_.py", line 449, in ssl_wrap_socket
    ssl_sock = _ssl_wrap_socket_impl(
  File "/usr/lib/python3/dist-packages/urllib3/util/ssl_.py", line 493, in _ssl_wrap_socket_impl
    return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
  File "/usr/lib/python3.10/ssl.py", line 513, in wrap_socket
    return self.sslsocket_class._create(
  File "/usr/lib/python3.10/ssl.py", line 1071, in _create
    self.do_handshake()
  File "/usr/lib/python3.10/ssl.py", line 1342, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLEOFError: [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/botocore/httpsession.py", line 465, in send
    urllib_response = conn.urlopen(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 755, in urlopen
    retries = retries.increment(
  File "/usr/lib/python3/dist-packages/urllib3/util/retry.py", line 507, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/lib/python3/dist-packages/six.py", line 718, in reraise
    raise value.with_traceback(tb)
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 699, in urlopen
    httplib_response = self._make_request(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 382, in _make_request
    self._validate_conn(conn)
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 1012, in _validate_conn
    conn.connect()
  File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 411, in connect
    self.sock = ssl_wrap_socket(
  File "/usr/lib/python3/dist-packages/urllib3/util/ssl_.py", line 449, in ssl_wrap_socket
    ssl_sock = _ssl_wrap_socket_impl(
  File "/usr/lib/python3/dist-packages/urllib3/util/ssl_.py", line 493, in _ssl_wrap_socket_impl
    return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
  File "/usr/lib/python3.10/ssl.py", line 513, in wrap_socket
    return self.sslsocket_class._create(
  File "/usr/lib/python3.10/ssl.py", line 1071, in _create
    self.do_handshake()
  File "/usr/lib/python3.10/ssl.py", line 1342, in do_handshake
    self._sslobj.do_handshake()
urllib3.exceptions.SSLError: [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 1733, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
  File "/tmp/tmpac9aabavprefect/src/amazon_s3/tasks/download_dir_from_s3_to_local.py", line 11, in download_dir_from_s3_to_local
    await s3_bucket.download_folder_to_path(str(s3_dir_path), local_raw_dir_path)
  File "/usr/local/lib/python3.10/dist-packages/prefect_aws/s3.py", line 791, in download_folder_to_path
    await asyncio.gather(*async_coros)
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/usr/local/lib/python3.10/dist-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/dist-packages/boto3/s3/inject.py", line 190, in download_file
    return transfer.download_file(
  File "/usr/local/lib/python3.10/dist-packages/boto3/s3/transfer.py", line 326, in download_file
    future.result()
  File "/usr/local/lib/python3.10/dist-packages/s3transfer/futures.py", line 103, in result
    return self._coordinator.result()
  File "/usr/local/lib/python3.10/dist-packages/s3transfer/futures.py", line 266, in result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/s3transfer/tasks.py", line 139, in __call__
    return self._execute_main(kwargs)
  File "/usr/local/lib/python3.10/dist-packages/s3transfer/tasks.py", line 162, in _execute_main
    return_value = self._main(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/s3transfer/download.py", line 569, in _main
    response = client.get_object(
  File "/usr/local/lib/python3.10/dist-packages/botocore/client.py", line 535, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.10/dist-packages/botocore/client.py", line 963, in _make_api_call
    http, parsed_response = self._make_request(
  File "/usr/local/lib/python3.10/dist-packages/botocore/client.py", line 986, in _make_request
    return self._endpoint.make_request(operation_model, request_dict)
  File "/usr/local/lib/python3.10/dist-packages/botocore/endpoint.py", line 119, in make_request
    return self._send_request(request_dict, operation_model)
  File "/usr/local/lib/python3.10/dist-packages/botocore/endpoint.py", line 202, in _send_request
    while self._needs_retry(
  File "/usr/local/lib/python3.10/dist-packages/botocore/endpoint.py", line 354, in _needs_retry
    responses = self._event_emitter.emit(
  File "/usr/local/lib/python3.10/dist-packages/botocore/hooks.py", line 412, in emit
    return self._emitter.emit(aliased_event_name, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/botocore/hooks.py", line 256, in emit
    return self._emit(event_name, kwargs)
  File "/usr/local/lib/python3.10/dist-packages/botocore/hooks.py", line 239, in _emit
    response = handler(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 207, in __call__
    if self._checker(**checker_kwargs):
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 284, in __call__
    should_retry = self._should_retry(
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 320, in _should_retry
    return self._checker(attempt_number, response, caught_exception)
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 363, in __call__
    checker_response = checker(
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 247, in __call__
    return self._check_caught_exception(
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 416, in _check_caught_exception
    raise caught_exception
  File "/usr/local/lib/python3.10/dist-packages/botocore/endpoint.py", line 281, in _do_get_response
    http_response = self._send(request)
  File "/usr/local/lib/python3.10/dist-packages/botocore/endpoint.py", line 377, in _send
    return self.http_session.send(request)
  File "/usr/local/lib/python3.10/dist-packages/botocore/httpsession.py", line 492, in send
    raise SSLError(endpoint_url=request.url, error=e)
botocore.exceptions.SSLError: SSL validation failed for https://my-bucket.s3.us-west-2.amazonaws.com/my-folder/records/fcc3/metadata [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007)
11:32:48 PM
Download | s3://my-bucket/my-folder-0
prefect.task_runs
Finished in state Failed('Task run encountered an exception SSLError: SSL validation failed for https://my-bucket.s3.us-west-2.amazonaws.com/my-folder/records/fcc3/metadata [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007)')
11:32:48 PM
Download | s3://my-bucket/my-folder-0
prefect.task_runs
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 699, in urlopen
    httplib_response = self._make_request(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 382, in _make_request
    self._validate_conn(conn)
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 1012, in _validate_conn
    conn.connect()
  File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 411, in connect
    self.sock = ssl_wrap_socket(
  File "/usr/lib/python3/dist-packages/urllib3/util/ssl_.py", line 449, in ssl_wrap_socket
    ssl_sock = _ssl_wrap_socket_impl(
  File "/usr/lib/python3/dist-packages/urllib3/util/ssl_.py", line 493, in _ssl_wrap_socket_impl
    return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
  File "/usr/lib/python3.10/ssl.py", line 513, in wrap_socket
    return self.sslsocket_class._create(
  File "/usr/lib/python3.10/ssl.py", line 1071, in _create
    self.do_handshake()
  File "/usr/lib/python3.10/ssl.py", line 1342, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLEOFError: [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/botocore/httpsession.py", line 465, in send
    urllib_response = conn.urlopen(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 755, in urlopen
    retries = retries.increment(
  File "/usr/lib/python3/dist-packages/urllib3/util/retry.py", line 507, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/lib/python3/dist-packages/six.py", line 718, in reraise
    raise value.with_traceback(tb)
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 699, in urlopen
    httplib_response = self._make_request(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 382, in _make_request
    self._validate_conn(conn)
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 1012, in _validate_conn
    conn.connect()
  File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 411, in connect
    self.sock = ssl_wrap_socket(
  File "/usr/lib/python3/dist-packages/urllib3/util/ssl_.py", line 449, in ssl_wrap_socket
    ssl_sock = _ssl_wrap_socket_impl(
  File "/usr/lib/python3/dist-packages/urllib3/util/ssl_.py", line 493, in _ssl_wrap_socket_impl
    return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
  File "/usr/lib/python3.10/ssl.py", line 513, in wrap_socket
    return self.sslsocket_class._create(
  File "/usr/lib/python3.10/ssl.py", line 1071, in _create
    self.do_handshake()
  File "/usr/lib/python3.10/ssl.py", line 1342, in do_handshake
    self._sslobj.do_handshake()
urllib3.exceptions.SSLError: [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 829, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
  File "/tmp/tmpac9aabavprefect/src/main.py", line 186, in itg_ingest_data
    glue_job_names = await download_process_upload_dir(
  File "/tmp/tmpac9aabavprefect/src/app/utils/download_process_upload_dir.py", line 41, in download_process_upload_dir
    await download_dir_from_s3_to_local.with_options(
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/api.py", line 182, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 282, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 1312, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 1733, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
  File "/tmp/tmpac9aabavprefect/src/amazon_s3/tasks/download_dir_from_s3_to_local.py", line 11, in download_dir_from_s3_to_local
    await s3_bucket.download_folder_to_path(str(s3_dir_path), local_raw_dir_path)
  File "/usr/local/lib/python3.10/dist-packages/prefect_aws/s3.py", line 791, in download_folder_to_path
    await asyncio.gather(*async_coros)
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/usr/local/lib/python3.10/dist-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/dist-packages/boto3/s3/inject.py", line 190, in download_file
    return transfer.download_file(
  File "/usr/local/lib/python3.10/dist-packages/boto3/s3/transfer.py", line 326, in download_file
    future.result()
  File "/usr/local/lib/python3.10/dist-packages/s3transfer/futures.py", line 103, in result
    return self._coordinator.result()
  File "/usr/local/lib/python3.10/dist-packages/s3transfer/futures.py", line 266, in result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/s3transfer/tasks.py", line 139, in __call__
    return self._execute_main(kwargs)
  File "/usr/local/lib/python3.10/dist-packages/s3transfer/tasks.py", line 162, in _execute_main
    return_value = self._main(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/s3transfer/download.py", line 569, in _main
    response = client.get_object(
  File "/usr/local/lib/python3.10/dist-packages/botocore/client.py", line 535, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.10/dist-packages/botocore/client.py", line 963, in _make_api_call
    http, parsed_response = self._make_request(
  File "/usr/local/lib/python3.10/dist-packages/botocore/client.py", line 986, in _make_request
    return self._endpoint.make_request(operation_model, request_dict)
  File "/usr/local/lib/python3.10/dist-packages/botocore/endpoint.py", line 119, in make_request
    return self._send_request(request_dict, operation_model)
  File "/usr/local/lib/python3.10/dist-packages/botocore/endpoint.py", line 202, in _send_request
    while self._needs_retry(
  File "/usr/local/lib/python3.10/dist-packages/botocore/endpoint.py", line 354, in _needs_retry
    responses = self._event_emitter.emit(
  File "/usr/local/lib/python3.10/dist-packages/botocore/hooks.py", line 412, in emit
    return self._emitter.emit(aliased_event_name, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/botocore/hooks.py", line 256, in emit
    return self._emit(event_name, kwargs)
  File "/usr/local/lib/python3.10/dist-packages/botocore/hooks.py", line 239, in _emit
    response = handler(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 207, in __call__
    if self._checker(**checker_kwargs):
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 284, in __call__
    should_retry = self._should_retry(
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 320, in _should_retry
    return self._checker(attempt_number, response, caught_exception)
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 363, in __call__
    checker_response = checker(
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 247, in __call__
    return self._check_caught_exception(
  File "/usr/local/lib/python3.10/dist-packages/botocore/retryhandler.py", line 416, in _check_caught_exception
    raise caught_exception
  File "/usr/local/lib/python3.10/dist-packages/botocore/endpoint.py", line 281, in _do_get_response
    http_response = self._send(request)
  File "/usr/local/lib/python3.10/dist-packages/botocore/endpoint.py", line 377, in _send
    return self.http_session.send(request)
  File "/usr/local/lib/python3.10/dist-packages/botocore/httpsession.py", line 492, in send
    raise SSLError(endpoint_url=request.url, error=e)
botocore.exceptions.SSLError: SSL validation failed for https://my-bucket.s3.us-west-2.amazonaws.com/my-folder/records/fcc3/metadata [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007)
11:32:48 PM
prefect.flow_runs
Finished in state Failed('Flow run encountered an exception. SSLError: SSL validation failed for https://my-bucket.s3.us-west-2.amazonaws.com/my-folder/records/fcc3/metadata [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007)')

I feel it is related with https://stackoverflow.com/a/46387660/2000548

Basically, if has a lot of requests to a S3 in short time, it will fail. Any way to download big folder at once? Thanks! ☺️

urimandujano commented 11 months ago

Hi @hongbo-miao, how many files are in the bucket? I'm trying to understand if this is a case of "download many small files" or "download a few huge files".

If there are many small files we have a couple of options - one is using AWS's built in retry mechanism to retry failures such as the one above. A minimal example of using that would look like:

from prefect_aws import AwsClientParameters, AwsCredentials, S3Bucket

client_params = AwsClientParameters(config={"retries": {"max_attempts": 10}})
creds = AwsCredentials(aws_client_parameters=client_params, ...)
bucket = S3Bucket(credentials=creds, bucket_name="...", ...)
...
bucket.download_folder_to_path()

The other option is to try using the bucket's S3Bucket.get_directory method -- internally this method will download one file at a time (as opposed to S3Bucket.download_folder_to_path which will concurrently download all files at once). This option will be a bit slower but won't overwhelm the bucket.

If the bucket has a couple of big files we'll need to try something else. Let me know if either of those help.