ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.05k stars 5.59k forks source link

[Data] Regression in handling list values when serializing to JSON due to use of Ray ExtensionDtypes #47327

Open marwan116 opened 3 weeks ago

marwan116 commented 3 weeks ago

What happened + What you expected to happen

I am bumping into a regression which seems like a bug to me due to the introduction of this code change in Ray 2.33.0 from what I can tell

The behavior of write_json has regressed from properly respecting python objects like list, to now truncating a list into its first element only.

See the reproduction script below.

Note the warning:

(MapBatches(<lambda>) pid=15155) Failed to interpret embeddings as multi-dimensional arrays. It will be pickled.

And note how the embeddings get truncated:

Out[8]: '{"embeddings":1}\n{"embeddings":2}\n{"embeddings":3}\n'

Versions / Dependencies

In [14]: sys.version
Out[14]: '3.10.11 (main, Dec 12 2023, 16:25:48) [Clang 15.0.0 (clang-1500.0.40.1)]'

In [15]: ray.__version__
Out[15]: '2.34.0'

Reproduction script

In [1]: import pandas as pd

In [2]: import ray

In [3]: df = pd.DataFrame({"embeddings": [[1,2 ,3],  [4, 5, 6], [7, 8, 9]]})

In [4]: ds = ray.data.from_pandas(df)
2024-08-25 20:11:12,981 INFO worker.py:1781 -- Started a local Ray instance.

In [5]: ds.to_pandas().dtypes
Out[5]: 
embeddings    object
dtype: object

In [6]: ds.map_batches(lambda x: x).to_pandas().dtypes
2024-08-25 20:11:27,380 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-25_20-11-12_379326_14679/logs/ray-data
2024-08-25 20:11:27,380 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(<lambda>)]
[dataset]: Run `pip install tqdm` to enable progress reporting.
(MapBatches(<lambda>) pid=15155) Failed to interpret embeddings as multi-dimensional arrays. It will be pickled.
Out[6]: 
embeddings    python_object()
dtype: object

In [7]: ds.to_pandas().to_json(orient="records", lines=True)
Out[7]: '{"embeddings":[1,2,3]}\n{"embeddings":[4,5,6]}\n{"embeddings":[7,8,9]}\n'

In [8]: ds.map_batches(lambda x: x).to_pandas().to_json(orient="records", lines=True)
2024-08-25 20:11:54,752 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-25_20-11-12_379326_14679/logs/ray-data
2024-08-25 20:11:54,752 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(<lambda>)]
Out[8]: '{"embeddings":1}\n{"embeddings":2}\n{"embeddings":3}\n'
(edited)

whereas with a version of ray <= 2.32.0 here is what I get:

In [1]: import ray

In [2]: import sys

In [3]: import pandas as pd

In [4]: df = pd.DataFrame({"embeddings": [[1,2 ,3],  [4, 5, 6], [7, 8, 9]]})

In [5]: ds = ray.data.from_pandas(df)
/Users/marwan/.pyenv/versions/3.10.11/envs/test-ray-data-regression/lib/python3.10/site-packages/numpy/_core/fromnumeric.py:57: FutureWarning: 'DataFrame.swapaxes' is deprecated and will be removed in a future version. Please use 'DataFrame.transpose' instead.
  return bound(*args, **kwds)
ds.m2024-08-25 20:14:56,409 INFO worker.py:1788 -- Started a local Ray instance.
ap
In [6]: ds.map_batches(lambda x: x).to_pandas().to_json(orient="records", lines=True)
2024-08-25 20:15:00,712 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-25_20-14-54_479512_19153/logs/ray-data
2024-08-25 20:15:00,713 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(<lambda>)]
[dataset]: Run `pip install tqdm` to enable progress reporting.
Out[6]: '{"embeddings":[1,2,3]}\n{"embeddings":[4,5,6]}\n{"embeddings":[7,8,9]}\n'

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Bye-legumes commented 2 weeks ago

I noticed similar issue and I think the bottle neck is _concat_same_type in arrow block, https://github.com/ray-project/ray/pull/45075. Also I have PR for this but that will need to upgrade the pyarrow version. I think this is a long time issue and we need to make decision if we should upgrade the pyarrow version so we can have better implementation for this convert part.