ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.24k stars 5.62k forks source link

Ray Dataset Unable to merge: field <> has incompatible types. Read slight different schemas #36051

Open CrashLaker opened 1 year ago

CrashLaker commented 1 year ago

What happened + What you expected to happen

Hi all,

I'm trying to read a folder whose contents has slight schema variation.

expected to work

import ray

ds = ray.data.read_json("./dataset")

print(ds.show())
print(ds.schema())

in ./dataset/data{0..N}.json

data0 = {
    "Records": {
        "a": 1,
        "b": 23423,
    }
}

data1 = {
    "Records": {
        "a": 1,
        "b": "234234",
    }
}

I'm getting an error of the sort: (DoRead pid=44337) pyarrow.lib.ArrowInvalid: Unable to merge: Field Records has incompatible types: struct<a: int64, b: string> vs struct<a: int64, b: int64> [repeated 5x across cluster]

I also can't seem to be able to force the schema explicit_schema having the error: (DoRead pid=45091) pyarrow.lib.ArrowInvalid: JSON parse error: Column(/Records/b) changed from string to number in row 0

is there any workaround to this? or a way to cast after reading?

ty.

regards,c.

Versions / Dependencies

uname -a Linux ip-172-31-33-24.sa-east-1.compute.internal 6.1.27-43.48.amzn2023.x86_64 #1 SMP PREEMPT_DYNAMIC Tue May 2 04:53:36 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

python3 --version Python 3.9.16

pip3 freeze | grep -E "ray|pandas|pyarrow" pandas==2.0.2 pyarrow==12.0.0 ray==2.4.0

Reproduction script

gen dataset

import os
import json

if not os.path.exists("./dataset"):
    os.mkdir("./dataset")

data0 = {
    "Records": {
        "a": 1,
        "b": 23423,
    }
}

data1 = {
    "Records": {
        "a": 1,
        "b": "234234",
    }
}

for i in range(20):
    if i%2 == 0:
        with open(f"./dataset/data{i}.json", "w") as f:
            f.write(json.dumps(data0, indent=4))
    else:
        with open(f"./dataset/data{i}.json", "w") as f:
            f.write(json.dumps(data1, indent=4))

read folder

import ray

ds = ray.data.read_json("./dataset")

print(ds.show())
print(ds.schema())
(DoRead pid=46227) 2023-06-03 03:26:03,221      INFO worker.py:844 -- Task failed with retryable exception: TaskID(73d5a5979f92972cffffffffffffffffffffffff01000000). [repeated 11x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)
(DoRead pid=46227) Traceback (most recent call last): [repeated 11x across cluster]
(DoRead pid=46227)   File "python/ray/_raylet.pyx", line 653, in ray._raylet.execute_dynamic_generator_and_store_task_outputs [repeated 11x across cluster]
(DoRead pid=46227)   File "python/ray/_raylet.pyx", line 2537, in ray._raylet.CoreWorker.store_task_outputs [repeated 11x across cluster]
(DoRead pid=46227)   File "/usr/local/lib64/python3.9/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 375, in _map_task [repeated 11x across cluster]
(DoRead pid=46227)     for b_out in fn(iter(blocks), ctx): [repeated 11x across cluster]
(DoRead pid=46227)   File "/usr/local/lib64/python3.9/site-packages/ray/data/_internal/execution/legacy_compat.py", line 219, in do_read [repeated 11x across cluster]
(DoRead pid=46227)     yield from read_task() [repeated 11x across cluster]
(DoRead pid=46227)   File "/usr/local/lib64/python3.9/site-packages/ray/data/datasource/datasource.py", line 216, in __call__ [repeated 11x across cluster]
(DoRead pid=46227)     for block in result: [repeated 11x across cluster]
(DoRead pid=46227)   File "/usr/local/lib64/python3.9/site-packages/ray/data/datasource/file_based_datasource.py", line 500, in read_files [repeated 11x across cluster]
(DoRead pid=46227)     yield output_buffer.next() [repeated 11x across cluster]
(DoRead pid=46227)   File "/usr/local/lib64/python3.9/site-packages/ray/data/_internal/output_buffer.py", line 74, in next [repeated 11x across cluster]
(DoRead pid=46227)     block = self._buffer.build() [repeated 11x across cluster]
(DoRead pid=46227)   File "/usr/local/lib64/python3.9/site-packages/ray/data/_internal/table_block.py", line 118, in build [repeated 22x across cluster]
(DoRead pid=46227)     return self._builder.build() [repeated 11x across cluster]
(DoRead pid=46227)     return self._concat_tables(tables) [repeated 11x across cluster]
(DoRead pid=46227)   File "/usr/local/lib64/python3.9/site-packages/ray/data/_internal/arrow_block.py", line 127, in _concat_tables [repeated 11x across cluster]
(DoRead pid=46227)     return transform_pyarrow.concat(tables) [repeated 11x across cluster]
(DoRead pid=46227)   File "/usr/local/lib64/python3.9/site-packages/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 255, in concat [repeated 11x across cluster]
(DoRead pid=46227)     table = pyarrow.concat_tables(blocks, promote=True) [repeated 11x across cluster]
(DoRead pid=46227)   File "pyarrow/table.pxi", line 5371, in pyarrow.lib.concat_tables [repeated 11x across cluster]
(DoRead pid=46227)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status [repeated 11x across cluster]
(DoRead pid=46227)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status [repeated 11x across cluster]
(DoRead pid=46228) pyarrow.lib.ArrowInvalid: Unable to merge: Field Records has incompatible types: struct<a: int64, b: int64> vs struct<a: int64, b: string> [repeated 5x across cluster]
(DoRead pid=46227) pyarrow.lib.ArrowInvalid: Unable to merge: Field Records has incompatible types: struct<a: int64, b: string> vs struct<a: int64, b: int64> [repeated 5x across cluster]

read folder with explicit_schema

import ray
import pyarrow as pa
import pyarrow.json

schema = pa.schema({
    "Records": pa.struct([
        pa.field("a", pa.int64()),
        pa.field("b", pa.string()),
    ])
})

ds = ray.data.read_json(
    "./dataset",
    parse_options=pa.json.ParseOptions(
        explicit_schema=schema,
    )
)

print(ds.show())
print(ds.schema())
(DoRead pid=45735) Traceback (most recent call last):
(DoRead pid=45735)   File "python/ray/_raylet.pyx", line 653, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(DoRead pid=45735)   File "python/ray/_raylet.pyx", line 2537, in ray._raylet.CoreWorker.store_task_outputs
(DoRead pid=45735)   File "/usr/local/lib64/python3.9/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 375, in _map_task
(DoRead pid=45735)     for b_out in fn(iter(blocks), ctx):
(DoRead pid=45735)   File "/usr/local/lib64/python3.9/site-packages/ray/data/_internal/execution/legacy_compat.py", line 219, in do_read
(DoRead pid=45735)     yield from read_task()
(DoRead pid=45735)   File "/usr/local/lib64/python3.9/site-packages/ray/data/datasource/datasource.py", line 216, in __call__
(DoRead pid=45735)     for block in result:
(DoRead pid=45735)   File "/usr/local/lib64/python3.9/site-packages/ray/data/datasource/file_based_datasource.py", line 490, in read_files
(DoRead pid=45735)     for data in read_stream(f, read_path, **reader_args):
(DoRead pid=45735)   File "/usr/local/lib64/python3.9/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream
(DoRead pid=45735)     yield self._read_file(f, path, **reader_args)
(DoRead pid=45735)   File "/usr/local/lib64/python3.9/site-packages/ray/data/datasource/json_datasource.py", line 36, in _read_file
(DoRead pid=45735)     return json.read_json(f, read_options=read_options, **reader_args)
(DoRead pid=45735)   File "pyarrow/_json.pyx", line 258, in pyarrow._json.read_json
(DoRead pid=45735)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(DoRead pid=45735)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
(DoRead pid=45735) pyarrow.lib.ArrowInvalid: JSON parse error: Column(/Records/b) changed from string to number in row 0

Issue Severity

High: It blocks me from completing my task.

scottjlee commented 1 year ago

We currently do not planning on allowing multiple schemas as input to ReadAPI methods. One possible workaround is to read files in grouped by schema first, use map functions to transform datasets into having the same schema, then use union to combine them into a single Dataset.