I am trying to read a large data set and store it into another format. The data set that I am reading is stored into many gzipped jsonl-files. This error occurred because one of the input files was empty.
I would expect ray data to ignore empty files so that batch processing can continue. I had to manually search the input file that produces the error in order to find out that the error occurred due to the file being empty. Also, I think this scenario is quite common, especially when filters are being applied to data sets before they are stored.
This is the full traceback:
Traceback (most recent call last):
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/exceptions.py", line 49, in handle_trace
return fn(*args, kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/plan.py", line 427, in execute_to_iterator
bundle_iter = itertools.chain([next(gen)], gen)
^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in next
return self.get_next()
^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next
bundle = self._base_iterator.get_next(output_split_idx)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 145, in get_next
item = self._outer._output_node.get_output_blocking(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 284, in get_output_blocking
raise self._exception
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 222, in run
continue_sched = self._scheduling_loop_step(self._topology)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 277, in _scheduling_loop_step
num_errored_blocks = process_completed_tasks(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 457, in process_completed_tasks
raise e from None
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 424, in process_completed_tasks
bytes_read = task.on_data_ready(
^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 105, in on_data_ready
raise ex from None
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 101, in on_data_ready
ray.get(block_ref)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, *kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(args, kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 2659, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 871, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(JSONDecodeError): ray::ReadJSON->SplitBlocks(6)() (pid=124743, ip=10.23.196.252)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 451, in call
for block in blocks:
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in call
for data in iter:
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in call
yield from self._block_fn(input, ctx)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py", line 92, in do_read
yield from call_with_retry(
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/datasource.py", line 197, in call
yield from result
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 253, in read_task_fn
yield from read_files(read_paths)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 219, in read_files
for block in read_stream(f, read_path):
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 135, in _read_stream
yield from self._read_with_python_json(buffer)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 104, in _read_with_python_json
parsed_json = json.load(BytesIO(buffer))
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/init.py", line 293, in load
return loads(fp.read(),
^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/init.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
ray.data.exceptions.SystemException
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/tmp/ray/session_2024-07-23_10-55-18_047193_3/runtime_resources/working_dir_files/_ray_pkg_942603d6e56f38f5/thesis_schneg/read_aql_error.py", line 175, in
print(ds_aql.schema())
^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/dataset.py", line 2583, in schema
base_schema = self.limit(1)._plan.schema(fetch_if_missing=fetch_if_missing)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/plan.py", line 348, in schema
iter_refbundles, , _ = self.execute_to_iterator()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/exceptions.py", line 89, in handle_trace
raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(JSONDecodeError): ray::ReadJSON->SplitBlocks(6)() (pid=124743, ip=10.23.196.252)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 451, in call
for block in blocks:
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in call
for data in iter:
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in call
yield from self._block_fn(input, ctx)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py", line 92, in do_read
yield from call_with_retry(
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/datasource.py", line 197, in call
yield from result
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 253, in read_task_fn
yield from read_files(read_paths)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 219, in read_files
for block in read_stream(f, read_path):
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 135, in _read_stream
yield from self._read_with_python_json(buffer)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 104, in _read_with_python_json
parsed_json = json.load(BytesIO(buffer))
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/init.py", line 293, in load
return loads(fp.read(),
^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/init.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
(ReadJSON->SplitBlocks(6) pid=124743, ip=10.23.196.252) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=124743, ip=10.23.196.252) Empty JSON file
Versions / Dependencies
ray version 2.33.0
python version 3.11.0rc1
Ubuntu 22.04.4 LTS (WSL)
Reproduction script
The following script is doing exactly the same as the original script did. Except, I did some adjustments (e.g. different and less column names) to make it more comprehensible. In my case, the input folder contains 1000 files resulting in a data set of over a billion rows.
from ray import init
import pyarrow as pa
from pyarrow.json import ParseOptions
from ray.data import read_json
What happened + What you expected to happen
I am trying to read a large data set and store it into another format. The data set that I am reading is stored into many gzipped jsonl-files. This error occurred because one of the input files was empty. I would expect ray data to ignore empty files so that batch processing can continue. I had to manually search the input file that produces the error in order to find out that the error occurred due to the file being empty. Also, I think this scenario is quite common, especially when filters are being applied to data sets before they are stored.
This is the full traceback:
Traceback (most recent call last): File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/exceptions.py", line 49, in handle_trace return fn(*args, kwargs) ^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/plan.py", line 427, in execute_to_iterator bundle_iter = itertools.chain([next(gen)], gen) ^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in next return self.get_next() ^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next bundle = self._base_iterator.get_next(output_split_idx) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 145, in get_next item = self._outer._output_node.get_output_blocking( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 284, in get_output_blocking raise self._exception File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 222, in run continue_sched = self._scheduling_loop_step(self._topology) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 277, in _scheduling_loop_step num_errored_blocks = process_completed_tasks( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 457, in process_completed_tasks raise e from None File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 424, in process_completed_tasks bytes_read = task.on_data_ready( ^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 105, in on_data_ready raise ex from None File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 101, in on_data_ready ray.get(block_ref) File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper return fn(*args, *kwargs) ^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper return func(args, kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 2659, in get values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 871, in get_objects raise value.as_instanceof_cause() ray.exceptions.RayTaskError(JSONDecodeError): ray::ReadJSON->SplitBlocks(6)() (pid=124743, ip=10.23.196.252) for b_out in map_transformer.apply_transform(iter(blocks), ctx): File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 451, in call for block in blocks: File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in call for data in iter: File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in call yield from self._block_fn(input, ctx) File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py", line 92, in do_read yield from call_with_retry( File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/datasource.py", line 197, in call yield from result File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 253, in read_task_fn yield from read_files(read_paths) File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 219, in read_files for block in read_stream(f, read_path): File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 135, in _read_stream yield from self._read_with_python_json(buffer) File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 104, in _read_with_python_json parsed_json = json.load(BytesIO(buffer)) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/json/init.py", line 293, in load return loads(fp.read(), ^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/json/init.py", line 346, in loads return _default_decoder.decode(s) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/json/decoder.py", line 337, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/json/decoder.py", line 355, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) ray.data.exceptions.SystemException
The above exception was the direct cause of the following exception:
Traceback (most recent call last): File "/tmp/ray/session_2024-07-23_10-55-18_047193_3/runtime_resources/working_dir_files/_ray_pkg_942603d6e56f38f5/thesis_schneg/read_aql_error.py", line 175, in
print(ds_aql.schema())
^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/dataset.py", line 2583, in schema
base_schema = self.limit(1)._plan.schema(fetch_if_missing=fetch_if_missing)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/plan.py", line 348, in schema
iter_refbundles, , _ = self.execute_to_iterator()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/exceptions.py", line 89, in handle_trace
raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(JSONDecodeError): ray::ReadJSON->SplitBlocks(6)() (pid=124743, ip=10.23.196.252)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 451, in call
for block in blocks:
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in call
for data in iter:
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in call
yield from self._block_fn(input, ctx)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py", line 92, in do_read
yield from call_with_retry(
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/datasource.py", line 197, in call
yield from result
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 253, in read_task_fn
yield from read_files(read_paths)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 219, in read_files
for block in read_stream(f, read_path):
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 135, in _read_stream
yield from self._read_with_python_json(buffer)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 104, in _read_with_python_json
parsed_json = json.load(BytesIO(buffer))
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/init.py", line 293, in load
return loads(fp.read(),
^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/init.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
(ReadJSON->SplitBlocks(6) pid=124743, ip=10.23.196.252) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=124743, ip=10.23.196.252) Empty JSON file
Versions / Dependencies
Reproduction script
The following script is doing exactly the same as the original script did. Except, I did some adjustments (e.g. different and less column names) to make it more comprehensible. In my case, the input folder contains 1000 files resulting in a data set of over a billion rows.
from ray import init
import pyarrow as pa from pyarrow.json import ParseOptions from ray.data import read_json
Initialize Ray (and connect to cluster).
init()
schema = pa.schema( [ pa.field("col1", pa.string(), nullable=True), pa.field("col2", pa.string(), nullable=True), pa.field("col3", pa.string(), nullable=True), ] )
input_paths = "path/to/files"
parse_options = ParseOptions(explicit_schema=schema)
ds = read_json(paths=input_paths, arrow_open_stream_args={"compression": "gzip"}, file_extensions=['gz'], concurrency=5)
ds = ds.drop_columns(cols=["col1"], concurrency=5)
output_path = "path/to/output" ds.write_parquet(output_path, concurrency=5, num_rows_per_file=500000)
Issue Severity
Medium: It is a significant difficulty but I can work around it.