NVIDIA-Merlin / NVTabular

NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems.
Apache License 2.0
1.05k stars 143 forks source link

[BUG] Error when trying to use AWS S3 as dataset #486

Closed albert17 closed 3 years ago

albert17 commented 3 years ago

Describe the bug Error when trying to use AWS S3 as dataset

Steps/Code to reproduce bug

Expected behavior

(rapids) root@ad0748ec2c77:/# python /nvtabular/examples/dask-nvtabular-criteo-benchmark.py -d 0 --data-path s3://merlin-datasets/crit_int_pq --out-path /tmp/ --freq-limit 0 --part-mem-frac 0.16 --device-limit-frac 0.8 --device-pool-frac 0.9
distributed.worker - WARNING -  Compute Failed
Function:  read_parquet_part
args:      (<function CudfEngine.read_partition at 0x7f7df32c3c20>, <s3fs.core.S3FileSystem object at 0x7f7dea2c2f50>, Empty DataFrame
Columns: [label, I1, I2, I3, I4, I5, I6, I7, I8, I9, I10, I11, I12, I13, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, C21, C22, C23, C24, C25, C26]
Index: [], [('merlin-datasets/crit_int_pq/day_22.parquet', [70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104], [])], ['label', 'I1', 'I2', 'I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13', 'C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'C22', 'C23', 'C24', 'C25', 'C26'], None, {'partitions': <pyarrow.parquet.ParquetPartitions object at 0x7f7cdba56090>, 'categories': [], 'filters': None, 'schema': label: int64
  -- field metadata --
  PARQUET:field_id: '1'
I1: int64
  -- f
kwargs:    {}
Exception: OSError(22, 'Bad Request')

Traceback (most recent call last):
  File "/nvtabular/examples/dask-nvtabular-criteo-benchmark.py", line 373, in <module>
    main(parse_args())
  File "/nvtabular/examples/dask-nvtabular-criteo-benchmark.py", line 195, in main
    output_path=output_path,
  File "/nvtabular/nvtabular/workflow.py", line 876, in apply
    dtypes=dtypes,
  File "/nvtabular/nvtabular/workflow.py", line 975, in build_and_process_graph
    self.exec_phase(idx, record_stats=record_stats, update_ddf=(idx == (end - 1)))
  File "/nvtabular/nvtabular/workflow.py", line 782, in exec_phase
    computed_stats, op = r.result()
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py", line 225, in result
    raise exc.with_traceback(tb)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in read_parquet_part
    dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in <listcomp>
    dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/io/parquet.py", line 60, in read_partition
    with fs.open(path, mode="rb") as f:
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/spec.py", line 903, in open
    **kwargs
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 378, in _open
    autocommit=autocommit, requester_pays=requester_pays)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 1097, in __init__
    cache_type=cache_type)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/spec.py", line 1224, in __init__
    self.details = fs.info(path)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 548, in info
    raise ee
OSError: [Errno 22] Bad Request

Additional context Add any other context about the problem here.

albert17 commented 3 years ago

I tried again and I got the same error. It works using local data. @benfred any idea about what may be going on?

(rapids) root@1f33b4c0ec68:/# python /nvtabular/examples/dask-nvtabular-criteo-benchmark.py -d 0 --data-path s3://merlin-datasets/crit_int_pq --out-path s3://merlin-datasets/crit_output --freq-limit 0 --part-mem-frac 0.1 --device-limit-frac 0.4 --device-pool-frac 0.5
distributed.worker - WARNING -  Compute Failed
Function:  read_parquet_part
args:      (<function CudfEngine.read_partition at 0x7fb89dabfc20>, <s3fs.core.S3FileSystem object at 0x7fb8486df710>, Empty DataFrame
Columns: [label, I1, I2, I3, I4, I5, I6, I7, I8, I9, I10, I11, I12, I13, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, C21, C22, C23, C24, C25, C26]
Index: [], [('merlin-datasets/crit_int_pq/day_23.parquet', [216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242], [])], ['label', 'I1', 'I2', 'I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13', 'C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'C22', 'C23', 'C24', 'C25', 'C26'], None, {'partitions': <pyarrow.parquet.ParquetPartitions object at 0x7fb7e35542d0>, 'categories': [], 'filters': None, 'schema': label: int64
  -- field metadata --
  PARQUET:field_id: '1'
I1: int64
  -- field metad
kwargs:    {}
Exception: OSError(22, 'Bad Request')

Traceback (most recent call last):
  File "/nvtabular/examples/dask-nvtabular-criteo-benchmark.py", line 373, in <module>
    main(parse_args())
  File "/nvtabular/examples/dask-nvtabular-criteo-benchmark.py", line 195, in main
    output_path=output_path,
  File "/nvtabular/nvtabular/workflow.py", line 876, in apply
    dtypes=dtypes,
  File "/nvtabular/nvtabular/workflow.py", line 975, in build_and_process_graph
    self.exec_phase(idx, record_stats=record_stats, update_ddf=(idx == (end - 1)))
  File "/nvtabular/nvtabular/workflow.py", line 782, in exec_phase
    computed_stats, op = r.result()
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py", line 225, in result
    raise exc.with_traceback(tb)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in read_parquet_part
    dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in <listcomp>
    dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/io/parquet.py", line 60, in read_partition
    with fs.open(path, mode="rb") as f:
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/spec.py", line 903, in open
    **kwargs
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 378, in _open
    autocommit=autocommit, requester_pays=requester_pays)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 1097, in __init__
    cache_type=cache_type)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/spec.py", line 1224, in __init__
    self.details = fs.info(path)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 548, in info
    raise ee
OSError: [Errno 22] Bad Request
quasiben commented 3 years ago

I have not seen that error before. I would suggest seeing if you can list the bucket with s3fs calls directly. I don't think this is a permission issue but it may be? Can you post the s3fs calls in dask-nvtabular-criteo-benchmark.py ?

albert17 commented 3 years ago

@quasiben

I had no problem listing the bucket and reading 1 file using s3fs

>>> import s3fs
>>> fs = s3fs.S3FileSystem()
>>> fs.ls('merlin-datasets')
['merlin-datasets/crit_int_pq', 'merlin-datasets/criteo_output', 'merlin-datasets/orig']
>>> f = fs.open('merlin-datasets/crit_int_pq/day_0.parquet')
>>> content = f.read()
>>> len(content)
12620273544

I am not understanding this "Can you post the s3fs calls in dask-nvtabular-criteo-benchmark.py ?"

quasiben commented 3 years ago

Where in nvtabular are the s3fs calls made ? Is it implicit in something like read_csv or do you make calls directly? Are credentials loaded onto the machine ? Are IAM configured when you launch the machine to read from that bucket?

albert17 commented 3 years ago

It is implicit in read_paquetand read_csv.

Credentials are loaded onto the machine and it is configured to be able to read from that bucket. In fact, what I just run it was in the AWS VM I am using.

I start running the program using the S3 storage, and it is seems like it is reading data but after a while (10 minutes or so) I get the error.

This works in Google Storage

albert17 commented 3 years ago

Another experiment that I run in the past and it worked was just reading a single file with dask_cudf

>>> import dask_cudf
>>> ddf = dask_cudf.read_parquet("s3://merlin-datasets/crit_int_pq/day_0.parquet")
>>> ddf.head(1)
   label  I1  I2  I3    I4  I5  I6  I7  ...         C19       C20         C21        C22        C23         C24         C25         C26
0      0  32   3   5  <NA>   1   0   0  ... -1793932789 -20981661 -1556988767 -924717482  391309800  1966410890 -1726799382 -1218975401
albert17 commented 3 years ago

@quasiben I was able to get a similar error using s3fs. I have created a program that reads all the 23 files in a loop:

import os
import s3fs

input_path = "merlin-datasets/crit_int_pq"
fname = 'day_{}.parquet'
paths = [os.path.join(input_path, fname.format(day)) for day in range(23)]

fs = s3fs.S3FileSystem()

for path in paths:
    print(path)
    f = fs.open(path)
    content = f.read()
    print(len(content))
    f.close()

This is the output

(rapids) root@76673da14b8f:~# python tests.py
merlin-datasets/crit_int_pq/day_0.parquet
12620273544
merlin-datasets/crit_int_pq/day_1.parquet
12888090031
merlin-datasets/crit_int_pq/day_2.parquet
12671897280
merlin-datasets/crit_int_pq/day_3.parquet
Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 530, in info
    Key=key, **version_id_kw(version_id), **self.req_kw)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 200, in _call_s3
    return method(**additional_kwargs)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/client.py", line 676, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (400) when calling the HeadObject operation: Bad Request

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "tests.py", line 12, in <module>
    f = fs.open(path)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/spec.py", line 903, in open
    **kwargs
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 378, in _open
    autocommit=autocommit, requester_pays=requester_pays)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 1097, in __init__
    cache_type=cache_type)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/spec.py", line 1224, in __init__
    self.details = fs.info(path)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 548, in info
    raise ee
OSError: [Errno 22] Bad Request

I reads successfully 3 files, but it fails with the 4th with Bad Request. If I rerun the program again, I immediately get the Bad Request Error. So it seems to me that the credentials are expiring while running.

quasiben commented 3 years ago

This issues looks relevant: https://github.com/dask/s3fs/issues/350#issuecomment-681980955

Can you try setting AWS_DEFAULT_REGION and see if the issue continues ?

albert17 commented 3 years ago

I have tried setting AWS_DEFAULT_REGION, I am not getting the Bad Request error, but I am getting another error. It seems to be due to the same permissions error.


(rapids) root@53209e76b346:/NVTA# python examples/dask-nvtabular-criteo-benchmark.py -d 0,1,2,3,4,5,6,7 --data-path s3://merlin-datasets/crit_int_pq --out-path s3://merlin-datasets/criteo_output --freq-limit 0 --part-mem-frac 0.1 --device-limit-frac 0.4 --device-pool-frac 0.5
distributed.worker - WARNING -  Compute Failed
Function:  read_parquet_part
args:      (<function CudfEngine.read_partition at 0x7f5b68489ef0>, <s3fs.core.S3FileSystem object at 0x7f5b58b71fd0>, Empty DataFrame
Columns: [C6, C25, I13, C20, I6, C22, label, C16, I10, C3, I8, I9, C11, C23, I11, C15, C9, I5, C13, C19, C21, I3, I2, C4, C5, C18, C8, C17, C12, C7, C1, C2, I1, I12, C14, I7, C26, C10, C24, I4]
Index: [], [('merlin-datasets/crit_int_pq/day_12.parquet', [81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107], [])], ['C6', 'C25', 'I13', 'C20', 'I6', 'C22', 'label', 'C16', 'I10', 'C3', 'I8', 'I9', 'C11', 'C23', 'I11', 'C15', 'C9', 'I5', 'C13', 'C19', 'C21', 'I3', 'I2', 'C4', 'C5', 'C18', 'C8', 'C17', 'C12', 'C7', 'C1', 'C2', 'I1', 'I12', 'C14', 'I7', 'C26', 'C10', 'C24', 'I4'], False, {'partitions': <pyarrow.parquet.ParquetPartitions object at 0x7f5ae5b38690>, 'categories': [], 'filters': None, 'schema': label: int64
  -- field metadata --
  PARQUET:field_id: '1'
I1: int64
  -- field metadata --
  PARQUET:f
kwargs:    {}
Exception: ProtocolError('Connection broken: IncompleteRead(6304441900 bytes read, 4939729300 more expected)', IncompleteRead(6304441900 bytes read, 4939729300 more expected))

Traceback (most recent call last):
  File "examples/dask-nvtabular-criteo-benchmark.py", line 370, in <module>
    main(parse_args())
  File "examples/dask-nvtabular-criteo-benchmark.py", line 177, in main
    processor.fit(dataset)
  File "/NVTA/nvtabular/workflow.py", line 129, in fit
    results = [r.result() for r in self.client.compute(stats)]
  File "/NVTA/nvtabular/workflow.py", line 129, in <listcomp>
    results = [r.result() for r in self.client.compute(stats)]
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py", line 225, in result
    raise exc.with_traceback(tb)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in read_parquet_part
    dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in <listcomp>
    dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/io/parquet.py", line 67, in read_partition
    **kwargs.get("read", {}),
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/cudf/io/parquet.py", line 205, in read_parquet
    path_or_data=source, compression=None, **kwargs
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/cudf/utils/ioutils.py", line 1037, in get_filepath_or_buffer
    path_or_data = BytesIO(path_or_data.read())
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/spec.py", line 1399, in read
    out = self.cache._fetch(self.loc, self.loc + length)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/caching.py", line 333, in _fetch
    self.cache = self.fetcher(start, bend)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 1200, in _fetch_range
    return _fetch_range(self.fs.s3, self.bucket, self.key, self.version_id, start, end, req_kw=self.req_kw)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py", line 1337, in _fetch_range
    return resp['Body'].read()
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/response.py", line 77, in read
    chunk = self._raw_stream.read(amt)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/urllib3/response.py", line 540, in read
    raise IncompleteRead(self._fp_bytes_read, self.length_remaining)
  File "/opt/conda/envs/rapids/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/urllib3/response.py", line 454, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(6304441900 bytes read, 4939729300 more expected)', IncompleteRead(6304441900 bytes read, 4939729300 more expected))
``
quasiben commented 3 years ago

@martindurant do you have any ideas about these s3fs issues ? If you're busy, no worries

martindurant commented 3 years ago

ProtocolError is a new one for me. Is it really a >11GB read in a single call? I am not surprised that that would trigger an overflow or timeout somewhere in the stack, even though the download is streaming in nature. Maybe it can be considered retriable. There could be code in s3fs to always split big calls like that into some maximum chunk size.

Anecdotally, BadRequest seems to happen if the auth expires during a call, or perhaps if another thread refreshes the auth before a running call returns.

The easiest way to log s3fs calls is environment "S3FS_LOGGING_LEVEL=DEBUG".

albert17 commented 3 years ago

So the problem was due to toke expiration. I requested Admin permissions for the AWS, and I have created a IAM policy that grants access to S3, and attaching it to the VM, it works successfully.

Thank you very much for all your help @quasiben and @martindurant.