ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.73k stars 5.54k forks source link

[Data] Ray Data doesn't work with TensorFlow versions 2.13-2.15 #44274

Open bveeramani opened 4 months ago

bveeramani commented 4 months ago

What happened + What you expected to happen

Ran batch inference with a TensorFlow model. Got a cloudpickle error:

2024-03-25 13:15:46.882449: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-03-25 13:15:46.934668: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-03-25 13:15:47.771973: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT
2024-03-25 13:15:49,636 INFO worker.py:1567 -- Connecting to existing Ray cluster at address: 10.0.51.27:6379...
2024-03-25 13:15:49,643 INFO worker.py:1743 -- Connected to Ray cluster. View the dashboard at https://session-y6mjq2qbh924efi9s4eunpawn4.i.anyscaleuserdata.com 
2024-03-25 13:15:49,645 INFO packaging.py:358 -- Pushing file package 'gcs://_ray_pkg_5741547d523d70bbf271197e3855e93f9ecfbd0a.zip' (0.00MiB) to Ray cluster...
2024-03-25 13:15:49,645 INFO packaging.py:371 -- Successfully pushed file package 'gcs://_ray_pkg_5741547d523d70bbf271197e3855e93f9ecfbd0a.zip'.
2024-03-25 13:15:50,866 INFO dataset.py:2368 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-03-25 13:15:50,869 INFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-03-25_12-11-14_377773_2919/logs/ray-data.log
2024-03-25 13:15:50,869 INFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(TFPredictor)] -> LimitOperator[limit=1]

2024-03-25 13:15:50,919 ERROR exceptions.py:69 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
ray.data.exceptions.SystemException

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ray/default/1.py", line 34, in <module>
    predictions.show(limit=1)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/data/dataset.py", line 2451, in show
    for row in self.take(limit):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/data/dataset.py", line 2375, in take
    for row in limited_ds.iter_rows():
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/data/iterator.py", line 241, in _wrapped_iterator
    for batch in batch_iterable:
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/data/iterator.py", line 162, in _create_iterator
    block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
    block_iterator, stats, executor = ds._plan.execute_to_iterator()
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/data/exceptions.py", line 83, in handle_trace
    raise e.with_traceback(None) from SystemException()
TypeError: Could not serialize the put value <ray.data._internal.execution.operators.map_transformer.MapTransformer object at 0x7f0f5472fa00>:
================================================================================
Checking Serializability of <ray.data._internal.execution.operators.map_transformer.MapTransformer object at 0x7f0f5472fa00>
================================================================================
!!! FAIL serialization: cannot pickle 'LazyLoader' object
    Serializing '_init_fn' <function _parse_op_fn.<locals>.init_fn at 0x7f0f547209d0>...
    !!! FAIL serialization: cannot pickle 'LazyLoader' object
    Detected 1 global variables. Checking serializability...
        Serializing 'ray' <module 'ray' from '/home/ray/anaconda3/lib/python3.9/site-packages/ray/__init__.py'>...
    Detected 3 nonlocal variables. Checking serializability...
        Serializing 'fn_constructor_args' ()...
        Serializing 'fn_constructor_kwargs' {}...
        Serializing 'op_fn' <class 'ray.data._internal.execution.util.make_callable_class_concurrent.<locals>._Wrapper'>...
        !!! FAIL serialization: cannot pickle 'LazyLoader' object
            Serializing '__call__' <function make_callable_class_concurrent.<locals>._Wrapper.__call__ at 0x7f0f54720940>...
            !!! FAIL serialization: cannot pickle 'LazyLoader' object
    Serializing '_init_fn' <function _parse_op_fn.<locals>.init_fn at 0x7f0f547209d0>...
    !!! FAIL serialization: cannot pickle 'LazyLoader' object
    Detected 1 global variables. Checking serializability...
        Serializing 'ray' <module 'ray' from '/home/ray/anaconda3/lib/python3.9/site-packages/ray/__init__.py'>...
    Detected 3 nonlocal variables. Checking serializability...
        Serializing 'fn_constructor_args' ()...
        Serializing 'fn_constructor_kwargs' {}...
        Serializing 'op_fn' <class 'ray.data._internal.execution.util.make_callable_class_concurrent.<locals>._Wrapper'>...
        !!! FAIL serialization: cannot pickle 'LazyLoader' object
            Serializing '__call__' <function make_callable_class_concurrent.<locals>._Wrapper.__call__ at 0x7f0f54720940>...
            !!! FAIL serialization: cannot pickle 'LazyLoader' object
================================================================================
Variable: 

        FailTuple(__call__ [obj=<function make_callable_class_concurrent.<locals>._Wrapper.__call__ at 0x7f0f54720940>, parent=<class 'ray.data._internal.execution.util.make_callable_class_concurrent.<locals>._Wrapper'>])
FailTuple(__call__ [obj=<function make_callable_class_concurrent.<locals>._Wrapper.__call__ at 0x7f0f54720940>, parent=<class 'ray.data._internal.execution.util.make_callable_class_concurrent.<locals>._Wrapper'>])

was found to be non-serializable. There may be multiple other undetected variables that were non-serializable. 
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class. 
================================================================================
Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
================================================================================

Versions / Dependencies

Ray: 77ee044a7e3c4d583f94956d7334fb3d6d7ce014 TensorFlow: 2.13-2.15 (2.12 and 2.16 work)

Reproduction script


from typing import Dict
import numpy as np
import tensorflow as tf
from tensorflow import keras

import ray

ds = ray.data.from_numpy(np.ones((1, 100)))

class TFPredictor:
    def __init__(self):
        # Move the neural network to GPU by specifying the GPU device.
        with tf.device("GPU:0"):
            input_layer = keras.Input(shape=(100,))
            output_layer = keras.layers.Dense(1, activation="sigmoid")
            self.model = keras.Sequential([input_layer, output_layer])

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        # Move the input batch to GPU by specifying GPU device.
        with tf.device("GPU:0"):
            return {"output": self.model(batch["data"]).numpy()}

# Use 2 actors, each actor using 1 GPU. 2 GPUs total.
predictions = ds.map_batches(
    TFPredictor,
    num_gpus=1,
    # Specify the batch size for inference.
    # Increase this for larger datasets.
    batch_size=1,
    # Set the concurrency to the number of GPUs in your cluster.
    concurrency=1,
)
predictions.show(limit=1)

Issue Severity

Medium: It is a significant difficulty but I can work around it.

c21 commented 4 months ago

Hi @bveeramani - to better understanding if this is a Ray Data only issue, can you check if same code works with Ray Core actor? If it does not work with Ray Core, it's a more fundamental issue with Tensorflow serializability. Otherwise, I want to understand how easy to fix on Ray Data side to make it work, thanks.

bveeramani commented 4 months ago

Serialization works if you directly use an actor.

@ray.remote
class TFPredictor:
    ...

TFPredictor.remote()

Interestingly, serialization also works when you write a more minimal reproduction. I expected this to fail based on the traceback.

import cloudpickle

from ray.data._internal.planner.plan_udf_map_op import _parse_op_fn
from ray.data._internal.logical.operators.map_operator import AbstractUDFMap

fn, init_fn = _parse_op_fn(AbstractUDFMap("spam", None, TFPredictor))
cloudpickle.dumps(fn)
cloudpickle.dumps(init_fn)

Something weird is going on, and it'll probably take more time to identify the root cause.

bveeramani commented 4 months ago

If you move the imports into the class, the program doesn't error.

class TFPredictor:
    def __init__(self):
        import tensorflow as tf
        from tensorflow import keras

        # Move the neural network to GPU by specifying the GPU device.
        with tf.device("GPU:0"):
            input_layer = keras.Input(shape=(100,))
            output_layer = keras.layers.Dense(1, activation="sigmoid")
            self.model = keras.Sequential([input_layer, output_layer])

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        import tensorflow as tf

        # Move the input batch to GPU by specifying GPU device.
        with tf.device("GPU:0"):
            return {"output": self.model(batch["data"]).numpy()}