NVIDIA-Merlin / NVTabular

NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems.
Apache License 2.0
1.05k stars 143 forks source link

[BUG] `regenerate_dataset()` method yielding `RuntimeError: cuDF failure at: ... /dask_sink.cpp:37` when using `gcs` protocol #1701

Open njelmert opened 1 year ago

njelmert commented 1 year ago

Describe the bug Using regenerate_dataset() method on a merlin.io.dataset.Dataset results in error RuntimeError: cuDF failure at: /workspace/.conda-bld/work/cpp/src/io/utilities/data_sink.cpp:37: Cannot open output file when using gcs protocol.

Full traceback is:

/opt/conda/miniconda3/lib/python3.8/site-packages/merlin/io/parquet.py:751: UserWarning: General-metadata information not detected! Please pass lists for `cats`, `conts`, and `labels` asarguments to `regenerate_dataset` to ensure a complete and correct _metadata.json file.
  warnings.warn(
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
/tmp/ipykernel_27088/1285145988.py in <cell line: 3>()
      1 import gcsfs
      2 
----> 3 nvt_test_ds_regen = nvt_ds.regenerate_dataset(
      4     output_path=os.path.join("gs://", BUCKET, 'tmp/data_dir/data_regen/'),
      5     part_size=128_000_000,

/opt/conda/miniconda3/lib/python3.8/site-packages/merlin/io/dataset.py in regenerate_dataset(self, output_path, columns, output_format, compute, **kwargs)
   1112         result = ParquetDatasetEngine.regenerate_dataset(self, output_path, columns=None, **kwargs)
   1113         if compute:
-> 1114             return result.compute()
   1115         else:
   1116             return result

/opt/conda/miniconda3/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    313         dask.base.compute
    314         """
--> 315         (result,) = compute(self, traverse=False, **kwargs)
    316         return result
    317 

/opt/conda/miniconda3/lib/python3.8/site-packages/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    598         postcomputes.append(x.__dask_postcompute__())
    599 
--> 600     results = schedule(dsk, keys, **kwargs)
    601     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    602 

/opt/conda/miniconda3/lib/python3.8/site-packages/dask/threaded.py in get(dsk, keys, cache, num_workers, pool, **kwargs)
     87             pool = MultiprocessingPoolExecutor(pool)
     88 
---> 89     results = get_async(
     90         pool.submit,
     91         pool._max_workers,

/opt/conda/miniconda3/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    509                             _execute_task(task, data)  # Re-execute locally
    510                         else:
--> 511                             raise_exception(exc, tb)
    512                     res, worker_id = loads(res_info)
    513                     state["cache"][key] = res

/opt/conda/miniconda3/lib/python3.8/site-packages/dask/local.py in reraise(exc, tb)
    317     if exc.__traceback__ is not tb:
    318         raise exc.with_traceback(tb)
--> 319     raise exc
    320 
    321 

/opt/conda/miniconda3/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    222     try:
    223         task, data = loads(task_info)
--> 224         result = _execute_task(task, data)
    225         id = get_id()
    226         result = dumps((result, id))

/opt/conda/miniconda3/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         # temporaries by their reference count and can execute certain
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):
    121         return arg

/opt/conda/miniconda3/lib/python3.8/site-packages/merlin/io/parquet.py in _write_data(data_list, output_path, fs, fn)
    922     for data in data_list:
    923         rows += len(data)
--> 924         writer.write_table(cudf.from_pandas(data))
    925 
    926     # Return metadata and row-count in dict

parquet.pyx in cudf._lib.parquet.ParquetWriter.write_table()

parquet.pyx in cudf._lib.parquet.ParquetWriter._initialize_chunked_state()

RuntimeError: cuDF failure at: /workspace/.conda-bld/work/cpp/src/io/utilities/data_sink.cpp:37: Cannot open output file

Steps/Code to reproduce bug

import cudf
import nvtabular as nvt

df = cudf.DataFrame({'a': list(range(20)),
                     'b': list(reversed(range(20))),
                     'c': list(range(20))
                    })

nvt_ds = nvt.Dataset(
        path_or_source=df,
        cpu=False,
)

nvt_ds_regen = nvt_ds.regenerate_dataset(
    output_path=os.path.join("gs://", BUCKET, 'tmp/data_dir/data_regen/'),
    part_size=128_000_000,
    file_size=128_000_000 * 10,
    compute=True,
)

Expected behavior Expect to see parquet files in specified GCS bucket such as:

gs://{BUCKET}/tmp/data_dir/data_regen/part.0.parquet
gs://{BUCKET}/tmp/data_dir/data_regen/part.1.parquet
...

Environment details (please complete the following information):

Versions of relevant libraries:

fsspec=='2022.5.0'
gcsfs=='2022.5.0'

CUDA_VERSION=11.7
CUDF_VERSION=22.10.0
SPARK_RAPIDS_VERSION=22.10.0
DASK_RAPIDS_VERSION=22.10
DASK_VERSION=2022.9.2
DASK_CUDA_VERSION=22.10.00
NVTABULAR_VERSION=1.5.0
MERLIN_CORE_VERSION=0.8.0
CUDA_PYTHON_VERSION=11.7.0
CUPY_VERSION=9.6.0

RAPIDS_RUNTIME=DASK
DASK_RUNTIME=standalone
INSTALL_GPU_AGENT=true
DASK_CUDA_WORKER_ON_MASTER=true

GPU_TYPE=nvidia-tesla-a100
N_GPUS=1
MASTER_MACHINE_TYPE=a2-highgpu-1g

Additional context Looking at the source code here I was trying to see if this was a permissions issue with the fsspec GCSFileSystem but I am able to see that my credentials, access, and tokens appear fine in the step:

from fsspec.core import get_fs_token_paths

storage_options = {
        'anon': True,
        'use_ssl': False,
    }

fs, fs_fs_token, paths = get_fs_token_paths(
    urlpath=os.path.join("gs://", BUCKET, 'tmp/data_dir/data_regen/'),
    mode="wb",
    storage_options=storage_options)

>> fs.credentials.credentials.scopes
['https://www.googleapis.com/auth/devstorage.full_control']

and I can also see that a token is generated and that it recognizes the project ID. I have no problem writing to this bucket using other frameworks (e.g. using PySpark, Pandas, pyarrow, etc.).

I dug around and this issue stuck out to me (GCSFileSystem hanging when called from multiple processes), but I am not sure if this is the right direction.

rjzamora commented 1 year ago

Thanks for raising an issue @njelmert !

I don't think to_parquet/regenerate_dataset were designed to handle remote storage yet, but I could be wrong. Either way, we should have the necessary tools in cudf/fsspec to do this now, so I'll try to figure out what needs to change.

njelmert commented 1 year ago

Thank you, @rjzamora ! I hadn't considered that not being a current capability so that's good to know. For now I have a work-around which temporarily dumps the parquet files to disk and gsutils them to remote storage. Looking forward to the adaptation.