ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.26k stars 5.81k forks source link

[Data] Overflowing Pyarrow buffer when using `read_binary` to read files > 2Gb #48419

Closed tmbdev closed 1 week ago

tmbdev commented 1 month ago

Description

I have 1100 shards of 3 Gbytes of data each that I read with read_text. I cannot find any way to process them with ray.data.

I consistently get these errors:

pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

The problem seems to be that pyarrow internally is limited to 2 Gbyte offsets. This seems like a pretty serious limitation given modern processors and datasets. It wouldn't be quite so serious if the ray.data.read_... routines consistently had a chunk_size or block_size argument that would permit on the fly splitting of files into multiple blocks, but they don't.

I don't even seem to be able to handle this by using read_binary_file with flat_map, since even the output of read_binary_file is represented as a PyArrow table and can't hold the raw binary data.

  File "pyarrow/array.pxi", line 358, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 85, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowCapacityError: array cannot contain more than 2147483646 bytes, have 2626945332

My suggestions would be:

(1) add optional block_size arguments to all readers (2) consider the support of a different/additional internal format other than PyArrow, a format not limited by 32 bit offsets

Somewhat related:

(3) instead of dealing with data partitioning in terms of total number of blocks (which is hard to do for datasets of unknown size), handle data partitioning in terms of block sizes. (4) implement a streaming_repartition(block_size)

Use case

I'm trying to read a sharded JSONL datasets whose shards happen to expand into data larger than 2GB when decompressed. Processing this dataset with ray.data seems impossible, since it is impossible even to read it with read_text.

pcmoritz commented 1 month ago

It seems to me that this is the same problem as https://github.com/ray-project/ray/issues/48236 cc @alexeykudinkin

tmbdev commented 1 month ago

Thanks. Yes, that's pretty much the same problem.

But it's more widespread than JSONL or JSON. You can't work around this problem with read_text or even read_binary_file since they all suffer from the same problem on the same dataset. Meaning, if it's too big to read with one of them it's usually too big to read with any of them.

Also, when reading a single file, it's not that hard to split. But becomes an even bigger problem for terabyte-petabyte scale datasets, where resplitting the data really isn't an option anymore, both because it takes a long time and because you end up with a ridiculous number of shards.

As I was saying, a short term fix would be to add optional block_size arguments to the read_text, read_json, etc., something that is independently useful when dealing with large shards/blocks.

Also, independently, making PyArrow compiled with 64bit offsets the default for the Python distributions would probably be a good idea at this point. (To me, the 1100 shard problem above is a fairly small data processing task.)

alexeykudinkin commented 1 month ago

@tmbdev can you please paste the whole stacktrace so that we can verify that it's indeed the same issue?

tmbdev commented 1 month ago
dataset = ray.data.read_text(files, ray_remote_args={"num_cpus": 45})
dataset = dataset.map_batches(convert_batch, batch_size=1000)
dataset.show(1)
2024-10-30 01:23:07,902 ERROR streaming_executor_state.py:469 -- An exception was raised from a task of operator "ReadText". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
2024-10-30 01:23:07,906 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
2024-10-30 01:23:07,907 ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/exceptions.py", line 49, in handle_trace
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/plan.py", line 428, in execute_to_iterator
    bundle_iter = itertools.chain([next(gen)], gen)
                                   ^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
           ^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next
    bundle = self._base_iterator.get_next(output_split_idx)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor.py", line 153, in get_next
    item = self._outer._output_node.get_output_blocking(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 296, in get_output_blocking
    raise self._exception
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor.py", line 232, in run
    continue_sched = self._scheduling_loop_step(self._topology)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor.py", line 287, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
                         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 470, in process_completed_tasks
    raise e from None
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 437, in process_completed_tasks
    bytes_read = task.on_data_ready(
                 ^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 105, in on_data_ready
    raise ex from None
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 101, in on_data_ready
    ray.get(block_ref)
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/_private/worker.py", line 2691, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/_private/worker.py", line 871, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ArrowInvalid): ray::ReadText() (pid=1937773, ip=10.128.0.4)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 395, in __call__
    yield output_buffer.next()
          ^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/output_buffer.py", line 94, in next
    block_remainder = block.slice(
                      ^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/arrow_block.py", line 288, in slice
    view = _copy_table(view)
           ^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/arrow_block.py", line 731, in _copy_table
    return transform_pyarrow.combine_chunks(table)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 339, in combine_chunks
    arr = col.combine_chunks()
          ^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 754, in pyarrow.lib.ChunkedArray.combine_chunks
  File "pyarrow/array.pxi", line 4579, in pyarrow.lib.concat_arrays
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
ds = ray.data.read_binary_files(files, ray_remote_args={"num_cpus": 45})
ds = ds.flat_map(process_chunk)
ds.show(1)
2024-10-29 21:41:46,290 ERROR streaming_executor_state.py:469 -- An exception was raised from a task of operator "ReadBinary". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
2024-10-29 21:41:46,297 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
2024-10-29 21:41:46,298 ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/exceptions.py", line 49, in handle_trace
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/plan.py", line 428, in execute_to_iterator
    bundle_iter = itertools.chain([next(gen)], gen)
                                   ^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
           ^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next
    bundle = self._base_iterator.get_next(output_split_idx)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor.py", line 153, in get_next
    item = self._outer._output_node.get_output_blocking(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 296, in get_output_blocking
    raise self._exception
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor.py", line 232, in run
    continue_sched = self._scheduling_loop_step(self._topology)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor.py", line 287, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
                         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 470, in process_completed_tasks
    raise e from None
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 437, in process_completed_tasks
    bytes_read = task.on_data_ready(
                 ^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 105, in on_data_ready
    raise ex from None
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 101, in on_data_ready
    ray.get(block_ref)
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/_private/worker.py", line 2691, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/_private/worker.py", line 871, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ArrowConversionError): ray::ReadBinary() (pid=1938227, ip=10.128.0.4)
           ^^^^^^^^^^^^^
  File "pyarrow/array.pxi", line 358, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 85, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowCapacityError: array cannot contain more than 2147483646 bytes, have 2626945332

The above exception was the direct cause of the following exception:

ray::ReadBinary() (pid=1938227, ip=10.128.0.4)
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 461, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in __call__
    for data in iter:
                ^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/planner/plan_read_op.py", line 95, in do_read
    yield from read_task()
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/datasource/datasource.py", line 168, in __call__
    yield from result
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/datasource/file_based_datasource.py", line 254, in read_task_fn
    yield from read_files(read_paths)
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/datasource/file_based_datasource.py", line 220, in read_files
    for block in read_stream(f, read_path):
                 ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/datasource/binary_datasource.py", line 21, in _read_stream
    yield builder.build()
          ^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/table_block.py", line 128, in build
    tables = [self._table_from_pydict(columns)]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/data/_internal/arrow_block.py", line 166, in _table_from_pydict
    columns[col_name] = convert_list_to_pyarrow_array(col, columns)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/work/anaconda3/envs/working_env/lib/python3.12/site-packages/ray/air/util/tensor_extensions/arrow.py", line 94, in convert_list_to_pyarrow_array
    raise ArrowConversionError(str(enclosing_dict)) from e
ray.air.util.tensor_extensions.arrow.ArrowConversionError: Error converting data to Arrow: {'bytes': array([b'{"aliases":{"ast":[{"language":"ast","value":"\xf0\x9f\x87\xa7\xf0\x9f\x87\xaa"}],"av":[{"language":"av","value":"\xd0\x91\xd0\xb5\xd0\xbb\xd0\xb3\xd0\xb5\xd0\xbd"}],"ban":[{"langua.
alexeykudinkin commented 1 month ago

@tmbdev this is a different issue -- currently convert_list_to_pyarrow_array converts to pa.array, while clearly your array is clearly exceeding its 2Gb limits

pcmoritz commented 1 month ago

It looks like the first problem is the same problem, and the second one is a different one, right?

alexeykudinkin commented 1 month ago

Oh, you're right @pcmoritz. I've been scrolling to the bottom and actually missed that there are 2 issues reported.

tmbdev commented 1 month ago

The bug isn't the crashes in those specific functions, the bug is the fact that there is no way of processing these JSONL files with Ray (as far as I know).

_As I was saying, a short term fix would be to add optional block_size arguments to the read_text, read_json, etc., something that is independently useful when dealing with large shards/blocks._

Also, independently, making PyArrow compiled with 64bit offsets the default for the Python distributions would probably be a good idea at this point. (To me, the 1100 shard problem above is a fairly small data processing task.)

Neither of those fixes requires changes to the functions where the crashes currently occur.

alexeykudinkin commented 1 month ago

@tmbdev with https://github.com/ray-project/ray/pull/48266 landed on master, you'd now be able to use read_text to read your dataset (you can install it as nightly)

Still focusing on a fix to enable read_binary to be able to read single files > 2Gb.