fsspec / s3fs

S3 Filesystem
http://s3fs.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
878 stars 272 forks source link

Intermittent 'PermissionError: Access Denied' when trying to read S3 file from AWS Lambda #218

Closed lavinia-k closed 4 years ago

lavinia-k commented 5 years ago

I'm running a Python 3.7 script in AWS Lambda, which runs queries against AWS Athena and tries to download the CSV results file that Athena stores on S3 once the query execution has completed.

Any ideas why I'd be running into the error below intermittently?

s3_query_result_path = f's3://{bucket}/{results_key}'

[ERROR] PermissionError: Access Denied
Traceback (most recent call last):
  File "/var/task/lambdas/process_file/lambda_function.py", line 91, in lambda_handler
    df = pd.read_csv(s3_query_result_path)
  File "/var/task/pandas/io/parsers.py", line 685, in parser_f
    return _read(filepath_or_buffer, kwds)
  File "/var/task/pandas/io/parsers.py", line 440, in _read
    filepath_or_buffer, encoding, compression
  File "/var/task/pandas/io/common.py", line 207, in get_filepath_or_buffer
    filepath_or_buffer, encoding=encoding, compression=compression, mode=mode
  File "/var/task/pandas/io/s3.py", line 36, in get_filepath_or_buffer
    filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode)
  File "/var/task/fsspec/spec.py", line 669, in open
    autocommit=ac, **kwargs)
  File "/var/task/s3fs/core.py", line 303, in _open
    autocommit=autocommit)
  File "/var/task/s3fs/core.py", line 920, in __init__
    cache_type=cache_type)
  File "/var/task/fsspec/spec.py", line 864, in __init__
    self.details = fs.info(path)
  File "/var/task/s3fs/core.py", line 479, in info
    return super().info(path)
  File "/var/task/fsspec/spec.py", line 477, in info
    out = self.ls(self._parent(path), detail=True, **kwargs)
  File "/var/task/s3fs/core.py", line 497, in ls
    files = self._ls(path, refresh=refresh)
  File "/var/task/s3fs/core.py", line 430, in _ls
    return self._lsdir(path, refresh)
  File "/var/task/s3fs/core.py", line 336, in _lsdir
    raise translate_boto_error(e)

As you can see above, I'm using the pandas library, which then uses s3fs under the hood.

The Lambda works about 80% of the time, and I can't figure out anything unique about the times it fails.

Feel free to let me know if I should be posting this question in pandas or elsewhere instead - thanks for your help!

martindurant commented 5 years ago

An intermittent problem is very hard to diagnose! You can set the logger level of s3fs.core.logger to DEBUG and see if you get any useful output (you will need to run logging.basicConfig(), if you don't already have logging set up). An "access denied" message probably has no more information contained, but you may want to check the AWS console for alerts, such as API quota overruns. This is probably the right place to log this issue, unless you want to check if others have been seeing the same in the lower level libraries, boto3 and botocore.

birdsarah commented 5 years ago

As discussed with @martindurant on gitter, I appear to be having the same error.

@martindurant suggested the following logging code:

def init_s3fs_logging():
    import logging
    import s3fs
    logging.basicConfig()  # not sure if you need this
    s3fs.core.logger.setLevel("DEBUG")

_ = client.run(init_s3fs_logging)

But it does not seem to be working to generate data in the worker logs (accessed via client.get_worker_logs()). All I can see is the following error

distributed.worker - WARNING - Compute Failed Function: execute_task args: ((<function apply at 0x7f3a470c8d90>, <function FastParquetEngine.write_partition at 0x7f3a1a0836a8>, [ index ... submission_date_s3 0 0 ... 20190719 1 1 ... 20190719 2 2 ... 20190719 3 3 ... 20190719 4 4 ... 20190719 ... ... ... ... 139995 139995 ... 20190719 139996 139996 ... 20190719 139997 139997 ... 20190719 139998 139998 ... 20190719 139999 139999 ... 20190719 [140000 rows x 22 columns], '..../20190719.parquet', <s3fs.core.S3FileSystem object at 0x7f39fa4ec358>, 'part.3061.parquet', [], True], (<class 'dict'>, [['fmd', <class 'fastparquet.parquet_thrift.parquet.ttypes.FileMetaData'> column_orders: None created_by: fastparquet-python version 1.0.0 (build 111) key_value_metadata: [<class 'fas kwargs: {} Exception: PermissionError('Access Denied') 

My full stacktrace is:

PermissionError                           Traceback (most recent call last)
<timed eval> in <module>

/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
   3618         from .io import to_parquet
   3619 
-> 3620         return to_parquet(self, path, *args, **kwargs)
   3621 
   3622     @derived_from(pd.DataFrame)

/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, ignore_divisions, partition_on, storage_options, write_metadata_file, compute, **kwargs)
    477 
    478     if compute:
--> 479         out = out.compute()
    480     return out
    481 

/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    173         dask.base.compute
    174         """
--> 175         (result,) = compute(self, traverse=False, **kwargs)
    176         return result
    177 
/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 
/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2509                     should_rejoin = False
   2510             try:
-> 2511                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2512             finally:
   2513                 for f in futures.values():
/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1809                 direct=direct,
   1810                 local_worker=local_worker,
-> 1811                 asynchronous=asynchronous,
   1812             )
   1813 
/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    750         else:
    751             return sync(
--> 752                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    753             )
    754 
/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    325             e.wait(10)
    326     if error[0]:
--> 327         six.reraise(*error[0])
    328     else:
    329         return result[0]
/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None
/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/utils.py in f()
    310             if callback_timeout is not None:
    311                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 312             result[0] = yield future
    313         except Exception as exc:
    314             error[0] = sys.exc_info()
/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()
/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1665                             exc = CancelledError(key)
   1666                         else:
-> 1667                             six.reraise(type(exception), exception, traceback)
   1668                         raise exc
   1669                     if errors == "skip":
/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:
/mnt/yarn/usercache/hadoop/appcache/application_1524886193677_198077/container_1524886193677_198077_01_000008/environment/lib/python3.7/site-packages/dask/compatibility.py in apply()
/mnt/yarn/usercache/hadoop/appcache/application_1524886193677_198077/container_1524886193677_198077_01_000008/environment/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in write_partition()
/mnt/yarn/usercache/hadoop/appcache/application_1524886193677_198077/container_1524886193677_198077_01_000008/environment/lib/python3.7/site-packages/fsspec/spec.py in __exit__()
/mnt/yarn/usercache/hadoop/appcache/application_1524886193677_198077/container_1524886193677_198077_01_000008/environment/lib/python3.7/site-packages/fsspec/spec.py in close()
/mnt/yarn/usercache/hadoop/appcache/application_1524886193677_198077/container_1524886193677_198077_01_000008/environment/lib/python3.7/site-packages/fsspec/spec.py in flush()
/mnt/yarn/usercache/hadoop/appcache/application_1524886193677_198077/container_1524886193677_198077_01_000008/environment/lib/python3.7/site-packages/s3fs/core.py in _initiate_upload()
PermissionError: Access Denied
birdsarah commented 5 years ago

Running client.run(s3fs.S3FileSystem.clear_instance_cache) periodically currently doesn't seem stop the issue:

My current theory is that there's some kind of race condition being hit occasionally where a write is attempted just before the new permission has kicked in.

birdsarah commented 5 years ago

The last time I did long / large uploads was in may so I'm not able to give much insight on when this might have started.

birdsarah commented 5 years ago

Small informational update. I am trying with s3fs==0.3.0 and boto3==1.9.188 (released at roughly the same time) and am having the same issue. (I can't try older as I need the capabilities of dask 2.2.0)

jakirkham commented 5 years ago

There just was a new release of Dask and Distributed. IDK if any changes in there help. Though if it is easy for you to try, it might be worthwhile 🙂

birdsarah commented 5 years ago

I am running dask and distributed 2.3.0.

briandegenhart commented 5 years ago

We came across a similar problem beginning with s3fs==0.3.0. It was a FileNotFoundError message, however, but also unpredictably intermittent (approximately 15-20% of the time) and within a lambda that was being triggered by the presence of a file in an S3 bucket. The issue occurred with files as small as 18kb. Unfortunately, I can't reproduce it outside of our production environment, where we've stuck with s3fs==0.2.2. This is the stack trace from last time we had the issue, which was using s3fs==0.3.1:

Traceback (most recent call last):
  File /var/task/src/filemetadata.py, line 29, in _read_file_data_definition
    2) as reader:
  File /var/task/src/StreamReader.py, line 29, in __enter__
    self.__opened_file = self.__fs.open(self.__bucket_url, 'r')
  File /var/task/fsspec/spec.py, line 666, in open
    return io.TextIOWrapper(self.open(path, mode, block_size, **kwargs))
  File /var/task/fsspec/spec.py, line 670, in open
    autocommit=ac, **kwargs)
  File /var/task/s3fs/core.py, line 302, in _open
    s3_additional_kwargs=kw, cache_type=cache_type)
  File /var/task/s3fs/core.py, line 904, in __init__
    cache_type=cache_type)
  File /var/task/fsspec/spec.py, line 865, in __init__
    self.details = fs.info(path)
  File /var/task/s3fs/core.py, line 469, in info
      return super().info(path)
  File /var/task/fsspec/spec.py, line 482, in info
      raise FileNotFoundError(path)
FileNotFoundError: adobe-campaign-hub-input/directmail/NAE_CHQ_146501861_245774912_2019-07-22_121116.txt
birdsarah commented 5 years ago

As an update. I was experiencing issues with parquet files of size 2000-3000 partitions.

I downgraded to s3fs==0.2.0 (as i knew it had worked well for me previously) and pushed 42000 partitions without seeing an error. (Sadly that was a 43000 partition file and it still failed with a boto error).

birdsarah commented 5 years ago

Just to confirm. Doing a fresh run right now similar to what i was doing last week and am not hitting the issue with 0.2.0.

mrocklin commented 5 years ago

Hooray?

On Wed, Aug 21, 2019 at 12:29 PM Sarah Bird notifications@github.com wrote:

Just to confirm. Doing a fresh run right now similar to what i was doing last week and am not hitting the issue with 0.2.0.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/s3fs/issues/218?email_source=notifications&email_token=AACKZTHXEVGBPZKHTL7CR3TQFWJP5A5CNFSM4IL3AME2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD423SQI#issuecomment-523614529, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTBRK4F7DETRN7X7PWLQFWJP5ANCNFSM4IL3AMEQ .

birdsarah commented 5 years ago

I did hit the boto issue again. Posting here in case it's useful, but it may be noise, so feel free to delete:

08/21/2019 08:39:32 PM UTC Read timeout on endpoint URL: "None"
Traceback (most recent call last):
  File "<ipython-input-17-ac8c03188521>", line 11, in <module>
    storage_options={'s3_additional_kwargs': sse_args},
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/dataframe/core.py", line 3618, in to_parquet
    return to_parquet(self, path, *args, **kwargs)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/dataframe/io/parquet.py", line 1496, in to_parquet
    out.compute()
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/base.py", line 175, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/base.py", line 446, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 2527, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 1823, in gather
    asynchronous=asynchronous,
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 763, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/utils.py", line 332, in sync
    six.reraise(*error[0])
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/six.py", line 693, in reraise
    raise value
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/utils.py", line 317, in f
    result[0] = yield future
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 1680, in _gather
    six.reraise(type(exception), exception, traceback)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/dask/dataframe/io/parquet.py", line 617, in _read_parquet_row_group
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/fastparquet/core.py", line 305, in read_row_group_file
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/fastparquet/core.py", line 354, in read_row_group
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/fastparquet/core.py", line 331, in read_row_group_arrays
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/fastparquet/core.py", line 245, in read_col
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/fastparquet/core.py", line 99, in read_data_page
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/fastparquet/core.py", line 21, in _read_page
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/s3fs/core.py", line 1333, in read
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/s3fs/core.py", line 1314, in _fetch
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/s3fs/core.py", line 1520, in _fetch_range
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1565902877864_2735/container_1565902877864_2735_01_000023/environment/lib/python3.7/site-packages/botocore/response.py", line 81, in read
botocore.exceptions.ReadTimeoutError: Read timeout on endpoint URL: "None"

This botocore error only happened once in writing 13TB of data spread over 21 parquet files of 600-3000 partitions each. For comparison, I hit the Permission Error for roughly 1/3 of those 21 parquet files and just ran the subset of failed files over and over again until all 21 passed.

birdsarah commented 5 years ago

One more thing, I am experiencing this issue writing data to S3 and on AWS EMR, which is different than the original reporter.

TomAugspurger commented 5 years ago

Thanks or the reports. I assume this is only happening with non-public files that require credentials?

I'm getting setup with AWS creds now, hoping to look into this today.

TomAugspurger commented 5 years ago

No luck reproducing so far :/ Based on the original post, the error was coming from a simple fs.open

`filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode)`
birdsarah commented 5 years ago

@TomAugspurger I was writing multiple very large ~1TB parquet files with dask. I don't know what order of magnitude you need to be to experience this issue. But I would start with something like that.

I would typically get this error after a couple of hours of processing.

birdsarah commented 5 years ago

FWIW I was also using server-side encryption, I don't know if this is relevant.

TomAugspurger commented 5 years ago

@lavinia-k @birdsarah @briandegenhart how are you handling authentication for s3? Credentials file, environment variable, ...?

birdsarah commented 5 years ago

My cluster was set up by an administrator. It's not clear where / how the credentials are. I can read from s3 without providing additional credentials e.g. s3.ls(path/to/bucket). But this bucket is definitely not public so it is credentialed access. There is an amazon-ssm-agent running.

However, our bucket uses server side encryption so I am passing storage_options={'s3_additional_kwargs': {sse stuff}} to my df.to_parquet call.

birdsarah commented 5 years ago

Have upgraded to latest version of fsspec (0.4.3) which also came with a boto upgrade (although I think that happens almost daily). Am experiencing an new error, but at a similar frequency (i.e. infrequently on large dataset writes):

Traceback (most recent call last):
  File "<ipython-input-12-1e58597ad448>", line 42, in <module>
    storage_options={'s3_additional_kwargs': sse_args},
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/dataframe/core.py", line 3629, in to_parquet
    return to_parquet(self, path, *args, **kwargs)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 483, in to_parquet
    out = out.compute()
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/base.py", line 175, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/base.py", line 446, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 2510, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 1812, in gather
    asynchronous=asynchronous,
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 753, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/utils.py", line 337, in sync
    six.reraise(*error[0])
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/six.py", line 693, in reraise
    raise value
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/utils.py", line 322, in f
    result[0] = yield future
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 1668, in _gather
    six.reraise(type(exception), exception, traceback)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 301, in read_parquet_part
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 340, in read_partition
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/fastparquet/api.py", line 438, in to_pandas
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/fastparquet/api.py", line 241, in read_row_group_file
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/fastparquet/core.py", line 305, in read_row_group_file
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/fastparquet/core.py", line 354, in read_row_group
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/fastparquet/core.py", line 331, in read_row_group_arrays
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/fastparquet/core.py", line 200, in read_col
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/fastparquet/thrift_structures.py", line 25, in read_thrift
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/fastparquet/parquet_thrift/parquet/ttypes.py", line 975, in read
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/thrift/transport/TTransport.py", line 189, in cstringio_refill
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/thrift/transport/TTransport.py", line 334, in read
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/fsspec/spec.py", line 1060, in read
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/fsspec/core.py", line 501, in _fetch
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/s3fs/core.py", line 1023, in _fetch_range
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/s3fs/core.py", line 1115, in _fetch_range
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/botocore/response.py", line 78, in read
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/urllib3/response.py", line 496, in read
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/contextlib.py", line 130, in __exit__
  File "/mnt/yarn/usercache/hadoop/appcache/application_1566335097751_3421/container_1566335097751_3421_01_000029/environment/lib/python3.7/site-packages/urllib3/response.py", line 415, in _error_catcher
urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))
birdsarah commented 5 years ago

To be clear I am also still experiencing the Access Denied error. Here's a fresh one:

Traceback (most recent call last):
  File "<ipython-input-13-1e58597ad448>", line 42, in <module>
    storage_options={'s3_additional_kwargs': sse_args},
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/dataframe/core.py", line 3629, in to_parquet
    return to_parquet(self, path, *args, **kwargs)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 483, in to_parquet
    out = out.compute()
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/base.py", line 175, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/dask/base.py", line 446, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 2510, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 1812, in gather
    asynchronous=asynchronous,
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 753, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/utils.py", line 337, in sync
    six.reraise(*error[0])
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/six.py", line 693, in reraise
    raise value
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/utils.py", line 322, in f
    result[0] = yield future
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 1668, in _gather
    six.reraise(type(exception), exception, traceback)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_5779/container_1565902877864_5779_01_000018/environment/lib/python3.7/site-packages/dask/compatibility.py", line 107, in apply
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_5779/container_1565902877864_5779_01_000018/environment/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 462, in write_partition
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_5779/container_1565902877864_5779_01_000018/environment/lib/python3.7/site-packages/fsspec/spec.py", line 1177, in __exit__
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_5779/container_1565902877864_5779_01_000018/environment/lib/python3.7/site-packages/fsspec/spec.py", line 1143, in close
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_5779/container_1565902877864_5779_01_000018/environment/lib/python3.7/site-packages/fsspec/spec.py", line 1015, in flush
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_5779/container_1565902877864_5779_01_000018/environment/lib/python3.7/site-packages/s3fs/core.py", line 966, in _initiate_upload
TomAugspurger commented 5 years ago

Possibly helpful for debugging:

In [22]: def init():
    ...:     import boto3
    ...:     boto3.set_stream_logger('')
    ...:

In [23]: client.run(init)
Out[23]:
{'tcp://127.0.0.1:64789': None,
 'tcp://127.0.0.1:64790': None,
 'tcp://127.0.0.1:64791': None,
 'tcp://127.0.0.1:64795': None}

That'll be very noisy. A single upload generates

2019-08-28 14:09:08,839 s3fs [DEBUG] Initiate upload for <File-like object S3FileSystem, it-3906/test>
2019-08-28 14:09:08,839 s3fs [DEBUG] CALL: <bound method ClientCreator._create_api_method.<locals>._api_call of <botocore.client.S3 object at 0x117944198>> - ({},) - {'Bucket': 'it-3906', 'Key': 'test', 'ACL': ''}
2019-08-28 14:09:08,842 urllib3.util.retry [DEBUG] Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0, status=None)
2019-08-28 14:09:08,842 urllib3.connectionpool [DEBUG] Starting new HTTPS connection (1): it-3906.s3.amazonaws.com:443
2019-08-28 14:09:09,543 urllib3.connectionpool [DEBUG] https://it-3906.s3.amazonaws.com:443 "POST /test?uploads HTTP/1.1" 200 None
2019-08-28 14:09:09,544 s3fs [DEBUG] Upload chunk <File-like object S3FileSystem, it-3906/test>, 1
2019-08-28 14:09:09,544 s3fs [DEBUG] CALL: <bound method ClientCreator._create_api_method.<locals>._api_call of <botocore.client.S3 object at 0x117944198>> - ({},) - {'Bucket': 'it-3906', 'PartNumber': 1, 'UploadId': 'a27FnlBMf.1hGGUPWw2KFy7tc7mcpqsHBEWi6t4rNZ9k_0FvR908t_eNrkC4..Ks_kmVV5y0tGPGOFDiUrrBPWCk6E8dMiROK6MxKzErYSlV_Gf9y5gfdxpaedQiwHDU', 'Key': 'test', 'ContentMD5': 'ICy5YqxZB1uWSwcVLSNLcA=='}
2019-08-28 14:09:09,546 urllib3.util.retry [DEBUG] Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0, status=None)
2019-08-28 14:09:09,823 urllib3.connectionpool [DEBUG] https://it-3906.s3.amazonaws.com:443 "PUT /test?partNumber=1&uploadId=a27FnlBMf.1hGGUPWw2KFy7tc7mcpqsHBEWi6t4rNZ9k_0FvR908t_eNrkC4..Ks_kmVV5y0tGPGOFDiUrrBPWCk6E8dMiROK6MxKzErYSlV_Gf9y5gfdxpaedQiwHDU HTTP/1.1" 200 0
2019-08-28 14:09:09,824 s3fs [DEBUG] Commit <File-like object S3FileSystem, it-3906/test>
2019-08-28 14:09:09,824 s3fs [DEBUG] CALL: <bound method ClientCreator._create_api_method.<locals>._api_call of <botocore.client.S3 object at 0x117944198>> - ({},) - {'Bucket': 'it-3906', 'Key': 'test', 'UploadId': 'a27FnlBMf.1hGGUPWw2KFy7tc7mcpqsHBEWi6t4rNZ9k_0FvR908t_eNrkC4..Ks_kmVV5y0tGPGOFDiUrrBPWCk6E8dMiROK6MxKzErYSlV_Gf9y5gfdxpaedQiwHDU', 'MultipartUpload': {'Parts': [{'PartNumber': 1, 'ETag': '"202cb962ac59075b964b07152d234b70"'}]}}
2019-08-28 14:09:09,826 urllib3.util.retry [DEBUG] Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0, status=None)
2019-08-28 14:09:09,957 urllib3.connectionpool [DEBUG] https://it-3906.s3.amazonaws.com:443 "POST /test?uploadId=a27FnlBMf.1hGGUPWw2KFy7tc7mcpqsHBEWi6t4rNZ9k_0FvR908t_eNrkC4..Ks_kmVV5y0tGPGOFDiUrrBPWCk6E8dMiROK6MxKzErYSlV_Gf9y5gfdxpaedQiwHDU HTTP/1.1" 200 None
birdsarah commented 5 years ago

Major progress! I have just re-run my ETL job with the following conda env (https://gist.github.com/birdsarah/fe9a9a7575bff041b154a35566e1f7f8):

I had no errors! This hasn't happened once since I started on this work about 2 weeks ago.

The main difference I can see is that compression is properly working again in my patch and my data is now coming out at 3TB not 13TB.

This information may be relevant to https://github.com/dask/dask/issues/5284 too.

mrocklin commented 5 years ago

Woo! Celebration!

On Wed, Aug 28, 2019 at 7:22 PM Sarah Bird notifications@github.com wrote:

Major progress! I have just re-run my ETL job with the following conda env (https://gist.github.com/birdsarah/fe9a9a7575bff041b154a35566e1f7f8):

I had no errors! This hasn't happened once since I started on this work about 2 weeks ago.

The main difference I can see is that compression is properly working again in my patch and my data is now coming out at 3TB not 13TB.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/s3fs/issues/218?email_source=notifications&email_token=AACKZTELHZQMWTDCHXCDTF3QG4XGZA5CNFSM4IL3AME2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD5NAPMY#issuecomment-525993907, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTD5HS34TCOMHJBJ7VLQG4XGZANCNFSM4IL3AMEQ .

birdsarah commented 5 years ago

Started pushing much smaller data and got the Access Denied error again. Environment was similar:

Am now trying with yesterday's version of boto

birdsarah commented 5 years ago

Thought it was a little mad. Downgrading boto to yesterday's version didn't help. Back to experiencing the intermittent error.

birdsarah commented 5 years ago

@TomAugspurger the logging function you posted isn't working for me. I see in my worker logs that it ran

distributed.worker - INFO - Run out-of-band function 'init' 

But there's nothing else in the worker logs. Anything else to know?

Update: Hmmm....now worker logs page is 500'ing. that was a red herring. never got anything in my worker logs.

antenuccit commented 5 years ago

@TomAugspurger @birdsarah @lavinia-k

Just wanted to add some additional info! I've been having the issue originally reported at the top of the thread with my lambda function for a few days now.

My function is triggered by files dropped in an S3 bucket, and I've noticed that when I only drop one file in the bucket, AWS will start up an instance of my lambda function and that file will be processed correctly. AWS will then keep the execution context of my lambda function open for another few minutes, and if I try to drop more files into the bucket, they'll error out with the exact same error the original poster included.

If I wait a few minutes before dropping in another file (long enough that the old execution context is killed) I can reliably process one file in the new execution context that gets started up before any subsequent runs on that context end in the same error. Currently using the most recent versions of boto3, s3fs, and pandas.

Hope this helps!

birdsarah commented 5 years ago

I don't know why i'm getting errors more frequently, but they're now often enough that I can't use the to_parquet function of dask and I'm using map_partitions and writing to s3 manually which means I'm getting no metadata.

martindurant commented 5 years ago

And you never see the problem via map_partitions? Can you post the code? Have you tried the extra debugging I made available?

birdsarah commented 5 years ago

I have not yet seen the problem on map_partitions. It's early days though. I'll keep you posted. This is the code I'm using.

    file_buffer = BytesIO()
    df.to_parquet(
        file_buffer,
        #engine='fastparquet',
        engine='pyarrow',
        compression='snappy',
        index=True,
        #times='int96',
        use_deprecated_int96_timestamps=True
    )

    s3_resource = boto3.resource('s3')
    s3_resource.Object(
        BUCKET, f'{FOLDER}/{date}.parquet/{uuid.uuid4()}.part.snappy.parquet'
    ).put(
        Body=file_buffer.getvalue(), **sse_args
    )

I've had to switch to pyarrow to get this to work as fastparquet complained about not liking a buffer.

The extra debugging isn't in released code and dask fails with a version error if you try and pip install from master. I posted on the PR about this yesterday.

birdsarah commented 5 years ago

(I welcome any thoughts on the above code, I stole it off stackoverflow)

birdsarah commented 5 years ago

I have no idea if this is useful. Got my first error using above code:

09/07/2019 07:29:16 PM UTC 'endpoint_resolver'
Traceback (most recent call last):
  File "<ipython-input-18-d7511c0d930a>", line 20, in <module>
    client.compute(tasks, sync=True)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 2700, in compute
    result = self.gather(futures)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 1812, in gather
    asynchronous=asynchronous,
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 753, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/utils.py", line 337, in sync
    six.reraise(*error[0])
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/six.py", line 693, in reraise
    raise value
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/utils.py", line 322, in f
    result[0] = yield future
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/distributed/client.py", line 1668, in _gather
    six.reraise(type(exception), exception, traceback)
  File "/mnt/miniconda/envs/jestr-etl/lib/python3.7/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/dask/optimization.py", line 1059, in __call__
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/dask/core.py", line 149, in get
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/dask/core.py", line 119, in _execute_task
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/dask/compatibility.py", line 107, in apply
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/dask/dataframe/core.py", line 4841, in apply_and_enforce
  File "<ipython-input-14-a817f62a7bcf>", line 68, in get_base_df_for_partition
    s3_resource = boto3.resource('s3')
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/boto3/__init__.py", line 100, in resource
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/boto3/session.py", line 389, in resource
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/boto3/session.py", line 263, in client
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/botocore/session.py", line 828, in create_client
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/botocore/session.py", line 695, in _get_internal_component
  File "/mnt/yarn/usercache/hadoop/appcache/application_1565902877864_10537/container_1565902877864_10537_01_000002/environment/lib/python3.7/site-packages/botocore/session.py", line 907, in get_component
KeyError: 'endpoint_resolver'

Relatedly, yesterday I was messing around with debugging and commented out lines (https://github.com/dask/s3fs/blob/master/s3fs/core.py#L995-L996) so that I would actually see the full error and not just Access Denied.

This gave me errors with KeyError: 'credential_provider'. I didn't save the logs because I was changing a few things e.g. trying to force the file to update it's credentials before trying to write.

However, I found this issue (https://github.com/boto/boto3/issues/1592) which also mentioned the endpoint_resolver issue which I'm now seeing above.

I have also tried passing retries argument (see below) to boto using dask to no effect.

(config = Config(
    retries = dict(
        max_attempts = 10
    )
)
birdsarah commented 5 years ago

Related: https://github.com/boto/botocore/issues/1246

birdsarah commented 5 years ago

@martindurant @TomAugspurger if the problem is boto sessions/resources not being threadsafe, are either of you able to provide me with some sample / example code to spin up a session appropriately in a map_partitions call so I can at least test this theory.

martindurant commented 5 years ago

https://github.com/dask/s3fs/issues/236 suggests that rate limiting on the S3 API can cause permission denied. The retries in s3fs should maybe implement exponential backoff, i.e., a sleep which becomes longer each time?

martindurant commented 5 years ago

(by the way, sessions were always shared between threads even before fsspec; we could look into creating thread-local sessions and/or s3fs instances, but that stuff is tricky)

martindurant commented 5 years ago

https://github.com/dask/s3fs/pull/237 does the backoff. Is is not applied to direct API calls list ls, only read/write.

birdsarah commented 5 years ago

Alright. Some data. Backoff did not help. But did run with new logging and have some info. This has been redacted in places but hopefully has what you need.

2019-09-18 01:02:32,323 - s3fs - DEBUG - Get directory listing page for in-file/_metadata
2019-09-18 01:02:32,339 - s3fs - DEBUG - Get directory listing page for in-file/_metadata/_metadata
2019-09-18 01:02:32,358 - s3fs - DEBUG - Fetch: in-file/_metadata, 9865021-9865029
2019-09-18 01:02:32,393 - s3fs - DEBUG - Fetch: in-file/_metadata, 4-5308420
distributed.utils_perf - WARNING - full garbage collections took 32% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 32% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 32% CPU time recently (threshold: 10%)
2019-09-18 01:02:33,456 - s3fs - DEBUG - Fetch: in-file/_metadata, 5308420-9865029
distributed.utils_perf - WARNING - full garbage collections took 34% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 34% CPU time recently (threshold: 10%)
2019-09-18 01:02:34,577 - s3fs - DEBUG - Fetch: in-file/part.1548.parquet, 4-1716731
2019-09-18 01:02:35,166 - s3fs - DEBUG - Initiate upload for <File-like object S3FileSystem, out-file/part.403.parquet>
2019-09-18 01:02:35,167 - s3fs - DEBUG - CALL: <bound method ClientCreator._create_api_method.<locals>._api_call of <botocore.client.S3 object at 0x7f989f6ec5c0>> - ({},) - {'Bucket': 'my-bucket', 'Key': 'out-file, 'ACL': ''}
2019-09-18 01:02:35,176 - s3fs - DEBUG - Initiate upload for <File-like object S3FileSystem, out-file/part.403.parquet>
2019-09-18 01:02:35,176 - s3fs - DEBUG - CALL: <bound method ClientCreator._create_api_method.<locals>._api_call of <botocore.client.S3 object at 0x7f989f6ec5c0>> - ({},) - {'Bucket': 'my-bucket', 'Key': 'out-file', 'ACL': ''}
2019-09-18 01:02:35,181 - s3fs - DEBUG - Initiate upload for <File-like object S3FileSystem, out-file/part.403.parquet>
2019-09-18 01:02:35,181 - s3fs - DEBUG - CALL: <bound method ClientCreator._create_api_method.<locals>._api_call of <botocore.client.S3 object at 0x7f989f6ec5c0>> - ({},) - {'Bucket': 'my-bucket', 'Key': 'out-file', 'ACL': ''}
distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f9911fa6730>, <function FastParquetEngine.write_partition at 0x7f98f1f0a1e0>, [        my-data

[620508 rows x 34 columns], 'net
kwargs:    {}
Exception: PermissionError('Access Denied')

2019-09-18 01:02:35,378 - s3fs - DEBUG - Setting up s3fs instance
2019-09-18 01:02:35,414 - s3fs - DEBUG - Get directory listing page for in-file's-directory
2019-09-18 01:02:35,499 - s3fs - DEBUG - Get directory listing page for in-file
2019-09-18 01:02:36,490 - s3fs - DEBUG - Get directory listing page for in-file/_metadata
2019-09-18 01:02:36,507 - s3fs - DEBUG - Get directory listing page for in-file/_metadata/_metadata
2019-09-18 01:02:36,535 - s3fs - DEBUG - Fetch: in-file/_metadata, 9865021-9865029
2019-09-18 01:02:36,562 - s3fs - DEBUG - Fetch: in-file/_metadata, 4-5308420
2019-09-18 01:02:36,859 - s3fs - DEBUG - Initiate upload for <File-like object S3FileSystem, out-file/part.403.parquet>
2019-09-18 01:02:36,860 - s3fs - DEBUG - CALL: <bound method ClientCreator._create_api_method.<locals>._api_call of <botocore.client.S3 object at 0x7f98c6bf7470>> - ({},) - {'Bucket': 'a-bucket', 'Key': 'out-file/part.403.parquet', 'ACL': ''}
Exception ignored in: <function AbstractBufferedFile.__del__ at 0x7f98cae29488>
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1566335572905_12867/container_1566335572905_12867_01_000072/environment/lib/python3.7/site-packages/fsspec/spec.py", line 1127, in __del__
    self.close()
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1566335572905_12867/container_1566335572905_12867_01_000072/environment/lib/python3.7/site-packages/fsspec/spec.py", line 1104, in close
    self.flush(force=True)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1566335572905_12867/container_1566335572905_12867_01_000072/environment/lib/python3.7/site-packages/fsspec/spec.py", line 976, in flush
    self._initiate_upload()
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1566335572905_12867/container_1566335572905_12867_01_000072/environment/lib/python3.7/site-packages/s3fs/core.py", line 997, in _initiate_upload
    raise translate_boto_error(e)
PermissionError: Access Denied
distributed.utils_perf - WARNING - full garbage collections took 32% CPU time recently (threshold: 10%)
distributed.utils_perf - INFO - full garbage collection released 335.89 MB from 9588 reference cycles (threshold: 10.00 MB)
distributed.utils_perf - WARNING - full garbage collections took 32% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 31% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 34% CPU time recently (threshold: 10%)
2019-09-18 01:02:37,999 - s3fs - DEBUG - Fetch: in-file/_metadata, 5308420-9865029
distributed.utils_perf - WARNING - full garbage collections took 34% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 34% CPU time recently (threshold: 10%)
2019-09-18 01:02:39,126 - s3fs - DEBUG - Fetch: in-file/part.1260.parquet, 4-1884889
distributed.worker - INFO - Stopping worker at tcp://...
distributed.nanny - INFO - Worker closed
End of LogType:dask.worker.log
birdsarah commented 5 years ago

Update I realized that I didn't test back off right - will report back. Sorry for the noise. I did test correctly. I installed s3fs master which has #237 merged. It did not help.

martindurant commented 5 years ago

I notice from the log that you are both reading and writing - is this correct? Are you appending to files, or is it binary (parquet) replacements? I do not see any Dask retry, which surprises me. Note that permissions error should not, in the normal sense, be retriable, but in this case perhaps it should be.

How many threads are running here? I wonder if you would be amenable to running with one thread per worker? I am working with a hypothesis that the boto session, which is known to have some thread-crossing difficulties, is hitting some race corruption. I have a fix I am trying on HTTP ( PR ) which could be applied to s3fs and might fix that.

birdsarah commented 5 years ago

I notice from the log that you are both reading and writing - is this correct?

Yes

Are you appending to files, or is it binary (parquet) replacements?

Neither. It's something like:

df = dd.read_parquet(in_path)
df = df.map_partitions(process_data)
df.to_parquet(out_path)

How many threads are running here? I wonder if you would be amenable to running with one thread per worker?

Almost always I'm working with 1. The error presents either way.

My dask setup is variants on:

profile = {
    'worker_vcores': 1,
    'worker_memory': '5.5GB',
    'n_workers': 40
}
cluster = YarnCluster(
    worker_vcores=profile['worker_vcores'],
    worker_memory=profile['worker_memory'],
    deploy_mode='local',  # Where the dashboard / scheduler live 
    environment='etl.tar.gz',  # Environment to ship
    name='DaskYarn',  # Shows up in "yarn application -list"
)

I do not see any Dask retry, which surprises me.

Dask retries has previously caused me to lose data (I have a separate issue open about that) so I'm not running by submitting to client with retries.

birdsarah commented 5 years ago

Also, just to restate the obvious, this problem doesn't occur with s3fs 0.2.0 (which is what I'm using now) so whatever you were doing before was working just fine.

jimmywan commented 5 years ago

this problem doesn't occur with s3fs 0.2.0

Same. We've resorted to pinning all of our code to use an older s3fs.

martindurant commented 5 years ago

@birdsarah @jimmywan @antenuccit , if you are game for more experiments... in fsspec:

--- a/fsspec/spec.py
+++ b/fsspec/spec.py
@@ -872,7 +872,7 @@ class AbstractBufferedFile(io.IOBase):
             )
         else:
             self.buffer = io.BytesIO()
-            self.offset = 0
+            self.offset = None
             self.forced = False
             self.location = None

@@ -990,12 +990,13 @@ class AbstractBufferedFile(io.IOBase):
             # no data in the buffer to write
             return

-        if not self.offset:
+        if self.offset is None:
             if not force and self.buffer.tell() < self.blocksize:
                 # Defer write on small block
                 return
             else:
                 # Initialize a multipart upload
+                self.offset = 0
                 self._initiate_upload()

I suggest this because _initate_upload() is being called on the same file more than once (without any chunks appearing), which I supposed was multiple threads, but apparently not. I'm not yet sure how this comes about.

martindurant commented 5 years ago

I would appreciate if you tried https://github.com/intake/filesystem_spec/pull/146 and https://github.com/dask/s3fs/pull/240 together. The main change, is to return the previous route for writing a file, where data less than a blocksize is handled with PUT rather than a single part of a multi-part-upload.

tomjonshea commented 5 years ago

Relevant AWS forum post

martindurant commented 5 years ago

Thanks for posting that. Pease copy here if you find anything out. For the time being, you could propose that permissions error is something that could be retriable in s3fs, although I can imagine you'll see why so far it hasn't been.

tomjonshea commented 5 years ago

I can tentatively confirm that pinning to s3fs==0.2.0 has not reproduced the Access Denied error across >1000 API calls in my production system, whereas using the most recent s3fs version generates the error within the first ~100 calls.

Current dependencies:

aws-psycopg2==1.1.1
boto3==1.9.238
botocore==1.12.238
certifi==2019.9.11
docutils==0.15.2
fsspec==0.4.1
jmespath==0.9.4
numpy==1.17.0
pandas==0.25.0
python-dateutil==2.8.0
pytz==2019.2
s3fs==0.2.0
s3transfer==0.2.1
sentry-sdk==0.12.2
six==1.12.0
urllib3==1.25.6
martindurant commented 5 years ago

As of the current fsspec and s3fs master, reauthentication should happen much less frequently. In case this was the problem, it would be worth trying again! @birdsarah do you have any appetite?