ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.06k stars 5.78k forks source link

[Ray Data] Need advice on loading 0.3 to 2TB for model training using ray.data.read_parquet() #39548

Open meprem opened 1 year ago

meprem commented 1 year ago

What happened + What you expected to happen

Hi Team,

I am relatively new to Ray Data and trying to move away from Petastorm. I have 0.3-2TB of training data and it's created correctly before model training step. But ray.data.read_parquet() function fails at random whenever there is more than 100GB of data on this single machine. But when I read the data using dask/dask_cudf, it works fine.

I have tried to control parallelism but that only worked for 100GB data or so. Could you please point me to the documentations/configurations that I can use?

Ray Configuration:

import psutil
num_cpus = psutil.cpu_count(logical=False)
print(f'Number of CPU cores: {num_cpus}')
%%time
ray.init(num_cpus=num_cpus)
...
ray_train_dataset = ray.data.read_parquet(train_files, ray_remote_args={"num_cpus": 0.2}, parallelism=5)

Error Message:

(_get_read_tasks pid=3908) /home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py:241: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Use the '.fragments' attribute instead
(_get_read_tasks pid=3908)   pq_ds.pieces, **prefetch_remote_args
(_get_read_tasks pid=3908) /home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py:344: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Use the '.fragments' attribute instead
(_get_read_tasks pid=3908)   num_files = len(self._pq_ds.pieces)
(_get_read_tasks pid=3908) /home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py:357: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Use the '.fragments' attribute instead
(_get_read_tasks pid=3908)   self._pq_ds.pieces[idx]
(_get_read_tasks pid=3908) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=3864, ip=172.31.28.148)                                                                            
(_get_read_tasks pid=3908)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=3908)     batch = next(batches)
(_get_read_tasks pid=3908)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=3908)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=3908)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=3908)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=3908) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
(_get_read_tasks pid=3908) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=3869, ip=172.31.28.148)
(_get_read_tasks pid=3908)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=3908)     batch = next(batches)
(_get_read_tasks pid=3908)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=3908)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=3908)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=3908)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=3908) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
(_get_read_tasks pid=3908) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=10185, ip=172.31.28.148)
(_get_read_tasks pid=3908)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=3908)     batch = next(batches)
(_get_read_tasks pid=3908)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=3908)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=3908)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=3908)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=3908) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
---------------------------------------------------------------------------
RayTaskError(ArrowNotImplementedError)    Traceback (most recent call last)
File <timed exec>:1

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/read_api.py:588, in read_parquet(paths, filesystem, columns, parallelism, ray_remote_args, tensor_column_schema, meta_provider, **arrow_parquet_args)
    516 """Create an Arrow dataset from parquet files.
    517 
    518 Examples:
   (...)
    582     Dataset producing Arrow records read from the specified paths.
    583 """
    584 arrow_parquet_args = _resolve_parquet_args(
    585     tensor_column_schema,
    586     **arrow_parquet_args,
    587 )
--> 588 return read_datasource(
    589     ParquetDatasource(),
    590     parallelism=parallelism,
    591     paths=paths,
    592     filesystem=filesystem,
    593     columns=columns,
    594     ray_remote_args=ray_remote_args,
    595     meta_provider=meta_provider,
    596     **arrow_parquet_args,
    597 )

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/read_api.py:344, in read_datasource(datasource, parallelism, ray_remote_args, **read_args)
    331     scheduling_strategy = NodeAffinitySchedulingStrategy(
    332         ray.get_runtime_context().get_node_id(),
    333         soft=False,
    334     )
    335     get_read_tasks = cached_remote_fn(
    336         _get_read_tasks, retry_exceptions=False, num_cpus=0
    337     ).options(scheduling_strategy=scheduling_strategy)
    339     (
    340         requested_parallelism,
    341         min_safe_parallelism,
    342         inmemory_size,
    343         read_tasks,
--> 344     ) = ray.get(
    345         get_read_tasks.remote(
    346             datasource,
    347             ctx,
    348             cur_pg,
    349             parallelism,
    350             local_uri,
    351             _wrap_arrow_serialization_workaround(read_args),
    352         )
    353     )
    355 # Compute the number of blocks the read will return. If the number of blocks is
    356 # expected to be less than the requested parallelism, boost the number of blocks
    357 # by adding an additional split into `k` pieces to each read task.
    358 if read_tasks:

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/client_mode_hook.py:103, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    101     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    102         return getattr(ray, func.__name__)(*args, **kwargs)
--> 103 return func(*args, **kwargs)

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/worker.py:2493, in get(object_refs, timeout)
   2491     worker.core_worker.dump_object_store_memory_usage()
   2492 if isinstance(value, RayTaskError):
-> 2493     raise value.as_instanceof_cause()
   2494 else:
   2495     raise value

RayTaskError(ArrowNotImplementedError): ray::_get_read_tasks() (pid=3908, ip=172.31.28.148)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/read_api.py", line 1928, in _get_read_tasks
    reader = ds.create_reader(**kwargs)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 170, in create_reader
    return _ParquetDatasourceReader(**kwargs)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 254, in __init__
    self._encoding_ratio = self._estimate_files_encoding_ratio()
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 378, in _estimate_files_encoding_ratio
    sample_ratios = sample_bar.fetch_until_complete(futures)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/_internal/progress_bar.py", line 93, in fetch_until_complete
    for ref, result in zip(done, ray.get(done)):
ray.exceptions.RayTaskError(ArrowNotImplementedError): ray::_sample_piece() (pid=3848, ip=172.31.28.148)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
    batch = next(batches)
  File "pyarrow/_dataset.pyx", line 3603, in _iterator
  File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null

Versions / Dependencies

Dataset size:

(base) [ec2-user@ip- target_encoding-nn]$ du -sh *
4.0K    data_attributes.json
24K test-dataset
307G    train-dataset
102G    valid-dataset
(base) [ec2-user@ip- target_encoding-nn]$

RAM:

(base) [ec2-user@ip- ~]$ lshw -short -C memory
H/W path    Device   Class          Description
===============================================
/0/1                 memory         1140GiB System memory
(base) [ec2-user@ip- ~]$

SSD:

(base) [ec2-user@ip- ~]$ df -m --block-size=TB
Filesystem     1TB-blocks  Used Available Use% Mounted on
devtmpfs              1TB   0TB       1TB   0% /dev
tmpfs                 1TB   1TB       1TB   1% /dev/shm
tmpfs                 1TB   1TB       1TB   1% /run
tmpfs                 1TB   0TB       1TB   0% /sys/fs/cgroup
/dev/nvme0n1p1        9TB   2TB       8TB  19% /
tmpfs                 1TB   0TB       1TB   0% /run/user/1000
(base) [ec2-user@ip-~]$

GPU configuration:

(base) [ec2-user@ip- ~]$ nvidia-smi
Mon Sep 11 16:26:25 2023
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.182.03   Driver Version: 470.182.03   CUDA Version: 11.4     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|===============================+======================+======================|
|   0  NVIDIA A100-SXM...  Off  | 00000000:10:1C.0 Off |                    0 |
| N/A   44C    P0    60W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   1  NVIDIA A100-SXM...  Off  | 00000000:10:1D.0 Off |                    0 |
| N/A   41C    P0    53W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   2  NVIDIA A100-SXM...  Off  | 00000000:20:1C.0 Off |                    0 |
| N/A   42C    P0    56W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   3  NVIDIA A100-SXM...  Off  | 00000000:20:1D.0 Off |                    0 |
| N/A   40C    P0    51W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   4  NVIDIA A100-SXM...  Off  | 00000000:90:1C.0 Off |                    0 |
| N/A   44C    P0    59W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   5  NVIDIA A100-SXM...  Off  | 00000000:90:1D.0 Off |                    0 |
| N/A   42C    P0    63W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   6  NVIDIA A100-SXM...  Off  | 00000000:A0:1C.0 Off |                    0 |
| N/A   48C    P0    67W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   7  NVIDIA A100-SXM...  Off  | 00000000:A0:1D.0 Off |                    0 |
| N/A   44C    P0    64W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                                  |
|  GPU   GI   CI        PID   Type   Process name                  GPU Memory |
|        ID   ID                                                   Usage      |
|=============================================================================|
|  No running processes found                                                 |
+-----------------------------------------------------------------------------+
(base) [ec2-user@ip~]$

Reproduction script

I am using production dataset and will be able to provide script to generate similar dataset, if required.

Issue Severity

High: It blocks me from completing my task.

stephanie-wang commented 1 year ago

I think you may be bumping into a performance bug related to picking too low of a parallelism value. Ideally, it'd be best if you can use the default parameters. What happens if you do not specify any additional arguments to read_parquet?

ray_train_dataset = ray.data.read_parquet(train_files)
meprem commented 1 year ago

Thank you for quick reply. I get the same error.

(base) [ec2-user@ip- train-dataset]$ ls | wc -l
1113
(base) [ec2-user@ip-train-dataset]$

ray_train_dataset = ray.data.read_parquet(train_files)

Error:

(_get_read_tasks pid=44471) /home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py:241: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Use the '.fragments' attribute instead
(_get_read_tasks pid=44471)   pq_ds.pieces, **prefetch_remote_args                                                                                                                                                                 
(_get_read_tasks pid=44471) /home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py:344: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Use the '.fragments' attribute instead
(_get_read_tasks pid=44471)   num_files = len(self._pq_ds.pieces)
(_get_read_tasks pid=44471) /home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py:357: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Use the '.fragments' attribute instead
(_get_read_tasks pid=44471)   self._pq_ds.pieces[idx]
(_get_read_tasks pid=44471) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=45516, ip=172.31.28.148)                                                                          
(_get_read_tasks pid=44471)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=44471)     batch = next(batches)
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=44471) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
(_get_read_tasks pid=44471) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=44448, ip=172.31.28.148)
(_get_read_tasks pid=44471)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=44471)     batch = next(batches)
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=44471) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
(_get_read_tasks pid=44471) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=50639, ip=172.31.28.148)
(_get_read_tasks pid=44471)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=44471)     batch = next(batches)
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=44471) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
(_get_read_tasks pid=44471) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=50643, ip=172.31.28.148)
(_get_read_tasks pid=44471)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=44471)     batch = next(batches)
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=44471) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
(_get_read_tasks pid=44471) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=50401, ip=172.31.28.148)
(_get_read_tasks pid=44471)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=44471)     batch = next(batches)
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=44471) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
---------------------------------------------------------------------------
RayTaskError(ArrowNotImplementedError)    Traceback (most recent call last)
File <timed exec>:1

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/read_api.py:588, in read_parquet(paths, filesystem, columns, parallelism, ray_remote_args, tensor_column_schema, meta_provider, **arrow_parquet_args)
    516 """Create an Arrow dataset from parquet files.
    517 
    518 Examples:
   (...)
    582     Dataset producing Arrow records read from the specified paths.
    583 """
    584 arrow_parquet_args = _resolve_parquet_args(
    585     tensor_column_schema,
    586     **arrow_parquet_args,
    587 )
--> 588 return read_datasource(
    589     ParquetDatasource(),
    590     parallelism=parallelism,
    591     paths=paths,
    592     filesystem=filesystem,
    593     columns=columns,
    594     ray_remote_args=ray_remote_args,
    595     meta_provider=meta_provider,
    596     **arrow_parquet_args,
    597 )

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/read_api.py:344, in read_datasource(datasource, parallelism, ray_remote_args, **read_args)
    331     scheduling_strategy = NodeAffinitySchedulingStrategy(
    332         ray.get_runtime_context().get_node_id(),
    333         soft=False,
    334     )
    335     get_read_tasks = cached_remote_fn(
    336         _get_read_tasks, retry_exceptions=False, num_cpus=0
    337     ).options(scheduling_strategy=scheduling_strategy)
    339     (
    340         requested_parallelism,
    341         min_safe_parallelism,
    342         inmemory_size,
    343         read_tasks,
--> 344     ) = ray.get(
    345         get_read_tasks.remote(
    346             datasource,
    347             ctx,
    348             cur_pg,
    349             parallelism,
    350             local_uri,
    351             _wrap_arrow_serialization_workaround(read_args),
    352         )
    353     )
    355 # Compute the number of blocks the read will return. If the number of blocks is
    356 # expected to be less than the requested parallelism, boost the number of blocks
    357 # by adding an additional split into `k` pieces to each read task.
    358 if read_tasks:

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/client_mode_hook.py:103, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    101     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    102         return getattr(ray, func.__name__)(*args, **kwargs)
--> 103 return func(*args, **kwargs)

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/worker.py:2493, in get(object_refs, timeout)
   2491     worker.core_worker.dump_object_store_memory_usage()
   2492 if isinstance(value, RayTaskError):
-> 2493     raise value.as_instanceof_cause()
   2494 else:
   2495     raise value

RayTaskError(ArrowNotImplementedError): ray::_get_read_tasks() (pid=44471, ip=172.31.28.148)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/read_api.py", line 1928, in _get_read_tasks
    reader = ds.create_reader(**kwargs)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 170, in create_reader
    return _ParquetDatasourceReader(**kwargs)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 254, in __init__
    self._encoding_ratio = self._estimate_files_encoding_ratio()
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 378, in _estimate_files_encoding_ratio
    sample_ratios = sample_bar.fetch_until_complete(futures)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/_internal/progress_bar.py", line 93, in fetch_until_complete
    for ref, result in zip(done, ray.get(done)):
ray.exceptions.RayTaskError(ArrowNotImplementedError): ray::_sample_piece() (pid=44406, ip=172.31.28.148)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
    batch = next(batches)
  File "pyarrow/_dataset.pyx", line 3603, in _iterator
  File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
(_get_read_tasks pid=44471) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=45571, ip=172.31.28.148)
(_get_read_tasks pid=44471)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=44471)     batch = next(batches)
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=44471) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
(_get_read_tasks pid=44471) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=50716, ip=172.31.28.148)
(_get_read_tasks pid=44471)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=44471)     batch = next(batches)
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=44471) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
(_get_read_tasks pid=44471) Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_sample_piece() (pid=50401, ip=172.31.28.148)
(_get_read_tasks pid=44471)   File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
(_get_read_tasks pid=44471)     batch = next(batches)
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3603, in _iterator
(_get_read_tasks pid=44471)   File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_get_read_tasks pid=44471)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(_get_read_tasks pid=44471) pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
stephanie-wang commented 1 year ago

Hmm the error message is unfortunately not very informative but I think it may be due to not being able to read one of the files properly, could be that the data was corrupted somehow during creation. Is it possible for you to provide a copy of the data or some synthetic copy? Otherwise, you could try reading each file manually with pyarrow and check that you are able to do so without error.

meprem commented 1 year ago

When I read file one-by-one with ray it seems fine.

%%time
chunks_of_1 = (train_files[i:i+1] for i in range(0,len(train_files),1))
for chunk_files in chunks_of_1:
    #try:
    ray_train_dataset = ray.data.read_parquet(chunk_files)
    #except:
    #print(f'An exception occurred while processing file: {chunk_files}')

Successful:

CPU times: user 39.5 s, sys: 4.2 s, total: 43.7 s
Wall time: 35min 7s

But when I try two files at a time from same directory. It's failing with same error:

I have some columns with nested data, do you think metadata may be the issue?

%%time
chunks_of_2 = (train_files[i:i+2] for i in range(0,len(train_files),2))
for chunk_files in chunks_of_2:
    #try:
    ray_train_dataset = ray.data.read_parquet(chunk_files)
    break
    #except:
    #    print(f'An exception occurred while processing file: {chunk_files}')

Error:

(_get_read_tasks pid=43663) /home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py:241: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Use the '.fragments' attribute instead
(_get_read_tasks pid=43663)   pq_ds.pieces, **prefetch_remote_args
(_get_read_tasks pid=43663) /home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py:344: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Use the '.fragments' attribute instead
(_get_read_tasks pid=43663)   num_files = len(self._pq_ds.pieces)
(_get_read_tasks pid=43663) /home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py:357: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Use the '.fragments' attribute instead
(_get_read_tasks pid=43663)   self._pq_ds.pieces[idx]

---------------------------------------------------------------------------
RayTaskError(ArrowNotImplementedError)    Traceback (most recent call last)
File <timed exec>:4

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/read_api.py:588, in read_parquet(paths, filesystem, columns, parallelism, ray_remote_args, tensor_column_schema, meta_provider, **arrow_parquet_args)
    516 """Create an Arrow dataset from parquet files.
    517 
    518 Examples:
   (...)
    582     Dataset producing Arrow records read from the specified paths.
    583 """
    584 arrow_parquet_args = _resolve_parquet_args(
    585     tensor_column_schema,
    586     **arrow_parquet_args,
    587 )
--> 588 return read_datasource(
    589     ParquetDatasource(),
    590     parallelism=parallelism,
    591     paths=paths,
    592     filesystem=filesystem,
    593     columns=columns,
    594     ray_remote_args=ray_remote_args,
    595     meta_provider=meta_provider,
    596     **arrow_parquet_args,
    597 )

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/read_api.py:344, in read_datasource(datasource, parallelism, ray_remote_args, **read_args)
    331     scheduling_strategy = NodeAffinitySchedulingStrategy(
    332         ray.get_runtime_context().get_node_id(),
    333         soft=False,
    334     )
    335     get_read_tasks = cached_remote_fn(
    336         _get_read_tasks, retry_exceptions=False, num_cpus=0
    337     ).options(scheduling_strategy=scheduling_strategy)
    339     (
    340         requested_parallelism,
    341         min_safe_parallelism,
    342         inmemory_size,
    343         read_tasks,
--> 344     ) = ray.get(
    345         get_read_tasks.remote(
    346             datasource,
    347             ctx,
    348             cur_pg,
    349             parallelism,
    350             local_uri,
    351             _wrap_arrow_serialization_workaround(read_args),
    352         )
    353     )
    355 # Compute the number of blocks the read will return. If the number of blocks is
    356 # expected to be less than the requested parallelism, boost the number of blocks
    357 # by adding an additional split into `k` pieces to each read task.
    358 if read_tasks:

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/client_mode_hook.py:103, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    101     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    102         return getattr(ray, func.__name__)(*args, **kwargs)
--> 103 return func(*args, **kwargs)

File ~/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/_private/worker.py:2493, in get(object_refs, timeout)
   2491     worker.core_worker.dump_object_store_memory_usage()
   2492 if isinstance(value, RayTaskError):
-> 2493     raise value.as_instanceof_cause()
   2494 else:
   2495     raise value

RayTaskError(ArrowNotImplementedError): ray::_get_read_tasks() (pid=43663, ip=172.31.28.148)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/read_api.py", line 1928, in _get_read_tasks
    reader = ds.create_reader(**kwargs)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 170, in create_reader
    return _ParquetDatasourceReader(**kwargs)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 254, in __init__
    self._encoding_ratio = self._estimate_files_encoding_ratio()
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 378, in _estimate_files_encoding_ratio
    sample_ratios = sample_bar.fetch_until_complete(futures)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/_internal/progress_bar.py", line 93, in fetch_until_complete
    for ref, result in zip(done, ray.get(done)):
ray.exceptions.RayTaskError(ArrowNotImplementedError): ray::_sample_piece() (pid=43685, ip=172.31.28.148)
  File "/home/ec2-user/miniconda3/envs/pytorch_ploomber/lib/python3.8/site-packages/ray/data/datasource/parquet_datasource.py", line 492, in _sample_piece
    batch = next(batches)
  File "pyarrow/_dataset.pyx", line 3603, in _iterator
  File "pyarrow/_dataset.pyx", line 3221, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Unsupported cast from double to null using function cast_null
meprem commented 1 year ago

I can't share the production data. But here are some more details:

Please let me know, if you need anything else. Thank you.

scottjlee commented 1 year ago

@meprem looping back on this, do you have some synthetic data you can share with us which is similar?

do each of the files have the same underlying schema/data types? from looking at the stack trace, that seems to be my first guess

meprem commented 1 year ago

@scottjlee Yes, all files have same underlying schema/data types. As mentioned above, I can't share data but generating following dataset, with random float32 values, should help reproduce the issue. Please let me know, if you need more clarification.

Full dataset:

scottjlee commented 1 year ago

remaining 300 are nested columns of size 1024

can you clarify what this nested column structure looks like? does this mean in each record, this column contains an array of float32s?

can this nested column be null, or its contents be null?

meprem commented 1 year ago

It's array of float32s ([...]) since it's embedding values. There are no nulls anywhere. If embedding value was null then it's already replaced with len([0,0,0...,0]) = 1024 in earlier steps.

Speaking of null values in nested columns, I would have preferred to keep it that way or either []. But I ran into other issues, so replaced it with len([0,0,0...,0]) = 1024 in earlier steps. However, it would be nice if Ray Data can gracefully handle nulls in nested columns and it can be expanded at runtime in data loader. This will help reduce disk space greatly, at least in my use-case.

scottjlee commented 1 year ago

can you confirm that the files can be read directly with the pyarrow API? e.g. ParquetDataset:

pq.ParquetDataset(paths, filesystem)

we want to see if there is an issue with reading (1) single file, and (2) multiple files at once. we are trying to possibly rule out the issue to the metadata fetching as you mentioned above, and to see if the issue is present for one or multiple files.

another thing we noticed is that in the code from this comment, the read_parquet(...) is not being materialized, so this doesn't actually read in the full data (only does some metadata fetching to do some file size estimates). could you try materializing with .materialize() to see if the data can be read in properly one file/multiple files at a time?

anyscalesam commented 10 months ago

@meprem can you advise here please?