intel-analytics / analytics-zoo

Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray
https://analytics-zoo.readthedocs.io/
Apache License 2.0
16 stars 3 forks source link

Orca on yarn cannot use numpy to preprocess image data #518

Closed GitEasonXu closed 3 years ago

GitEasonXu commented 3 years ago

Problem Description:

Code segment

def train_preprocess(x, y):
    #Color augmentation
    x = tf.image.random_hue(x, 0.3)
    x = tf.image.random_saturation(x, 0.6, 1.6)
    x = tf.image.random_brightness(x, 0.3)
    x = tf.image.random_contrast(x, 0.7, 1.3)
    #Flip
    x = tf.image.random_flip_left_right(x)
    x = tf.image.random_flip_up_down(x)
    #Random crop
    x = tf.image.random_crop(x,size=[int(image_shape[0] * 0.9), int(image_shape[1] * 0.9), 3])
    x = tf.cast(tf.image.resize(x, [image_shape[0], image_shape[1]]), dtype=tf.float32) / 255.0
    return x, y
def image_aug(image):
    result = seq(image=image.numpy())
    result = result.astype(np.float32) / 255.
    return result
def train_preprocess_aug(x, y):
    x = tf.py_function(image_aug, [x], tf.float32)
    return x, y

def test_preprocess(x, y):
    x = tf.cast(tf.image.resize(x, [image_shape[0], image_shape[1]]), dtype=tf.float32) / 255.0
    return x, y

def train_data_creator(config):
    global train_images, train_labels
    dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
    dataset = dataset.repeat()
    dataset = dataset.map(train_preprocess)
    dataset = dataset.shuffle(1000)
    dataset = dataset.batch(config["batch_size"])

    return dataset

Cannot train mode, if dataset.map using train_preprocess_aug in the train_data_creator, but if dataset.map using train_preprocess that will be ok.

Error details, when dataset.map using train_preprocess_aug.

(pid=26239, ip=10.180.210.172) WARNING:tensorflow:From /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
(pid=26239, ip=10.180.210.172) Instructions for updating:
(pid=26239, ip=10.180.210.172) Use `tf.data.Iterator.get_next_as_optional()` instead.
---------------------------------------------------------------------------
RayTaskError(ValueError)                  Traceback (most recent call last)
<ipython-input-11-2cae64a93224> in <module>
      6                       init_ray_on_spark=True, memory="6g", driver_memory="6g",hadoop_user_name='hdfs', hadoop_conf="/etc/hadoop/3.0.1.0-187/0/")
      7 
----> 8 main(5)

<ipython-input-10-c30b9df2ca11> in main(max_epoch)
     11                     steps_per_epoch=60000 // batch_size,
     12                     validation_data_creator=val_data_creator,
---> 13                     validation_steps=10000 // batch_size)
     14 
     15     print(stats)

~/anaconda3/envs/zoo-tf2.3/lib/python3.6/site-packages/zoo/orca/learn/tf2/tf_ray_estimator.py in fit(self, data_creator, epochs, verbose, callbacks, validation_data_creator, class_weight, steps_per_epoch, validation_steps, validation_freq, data_config, feature_cols, label_cols)
    257 
    258             worker_stats = ray.get([self.remote_workers[i].step.remote(**params_list[i])
--> 259                                     for i in range(self.num_workers)])
    260             worker_stats = list(itertools.chain.from_iterable(worker_stats))
    261         stats = worker_stats[0].copy()

~/anaconda3/envs/zoo-tf2.3/lib/python3.6/site-packages/ray/worker.py in get(object_ids, timeout)
   1511                     worker.core_worker.dump_object_store_memory_usage()
   1512                 if isinstance(value, RayTaskError):
-> 1513                     raise value.as_instanceof_cause()
   1514                 else:
   1515                     raise value

RayTaskError(ValueError): ray::TFRunner.step() (pid=26240, ip=10.180.210.172)
  File "python/ray/_raylet.pyx", line 452, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 407, in ray._raylet.execute_task.function_executor
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/zoo/orca/learn/tf2/tf_runner.py", line 365, in step
    validation_freq=validation_freq)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py", line 117, in _method_wrapper
    mode=dc.CoordinatorMode.INDEPENDENT_WORKER)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/distribute/distribute_coordinator.py", line 860, in run_distribute_coordinator
    task_id, session_config, rpc_layer)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/distribute/distribute_coordinator.py", line 360, in _run_single_worker
    return worker_fn(strategy)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py", line 115, in <lambda>
    lambda _: method(self, *args, **kwargs),
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py", line 1098, in fit
    tmp_logs = train_function(iterator)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/eager/def_function.py", line 780, in __call__
    result = self._call(*args, **kwds)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/eager/def_function.py", line 823, in _call
    self._initialize(args, kwds, add_initializers_to=initializers)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/eager/def_function.py", line 697, in _initialize
    *args, **kwds))
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 2855, in _get_concrete_function_internal_garbage_collected
    graph_function, _, _ = self._maybe_define_function(args, kwargs)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 3213, in _maybe_define_function
    graph_function = self._create_graph_function(args, kwargs)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 3075, in _create_graph_function
    capture_by_value=self._capture_by_value),
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/framework/func_graph.py", line 986, in func_graph_from_py_func
    func_outputs = python_func(*func_args, **func_kwargs)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/eager/def_function.py", line 600, in wrapped_fn
    return weak_wrapped_fn().__wrapped__(*args, **kwds)
  File "/hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/framework/func_graph.py", line 973, in wrapper
    raise e.ag_error_metadata.to_exception(e)
ValueError: in user code:

    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py:806 train_function  *
        return step_function(self, iterator)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py:796 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/distribute/distribute_lib.py:1211 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/distribute/distribute_lib.py:2585 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/distribute/mirrored_strategy.py:585 _call_for_each_replica
        self._container_strategy(), fn, args, kwargs)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/distribute/mirrored_run.py:96 call_for_each_replica
        return _call_for_each_replica(strategy, fn, args, kwargs)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/distribute/mirrored_run.py:237 _call_for_each_replica
        coord.join(threads)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/training/coordinator.py:389 join
        six.reraise(*self._exc_info_to_raise)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/six.py:703 reraise
        raise value
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/training/coordinator.py:297 stop_on_exception
        yield
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/distribute/mirrored_run.py:323 run
        self.main_result = self.main_fn(*self.main_args, **self.main_kwargs)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py:789 run_step  **
        outputs = model.train_step(data)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py:747 train_step
        y_pred = self(x, training=True)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py:976 __call__
        self.name)
    /hadoop/yarn/local/usercache/hdfs/appcache/application_1607477474145_0021/container_1607477474145_0021_01_000003/python_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/input_spec.py:168 assert_input_compatibility
        layer_name + ' is incompatible with the layer: '

    ValueError: Input 0 of layer sequential is incompatible with the layer: its rank is undefined, but the layer requires a defined rank.

The code can be geted here

hkvision commented 3 years ago

@yangw1234 Please take a look.

yangw1234 commented 3 years ago

Looks like this is an issue of tensorflow.

According to the error message " ValueError: Input 0 of layer sequential is incompatible with the layer: its rank is undefined, but the layer requires a defined rank.", it seems your model cannot recognize the shape of the output of your py_function. Does you dataset work without using orca?

I think you can also try adding a tf.reshape op after your tf.py_fucntion.

GitEasonXu commented 3 years ago

@yangw1234 You are right. After adding a tf.reshape op , there is no problem.

def train_preprocess_aug(x, y):
    x = tf.py_function(image_aug, [x], tf.float32)
    x = tf.cast(tf.reshape(x, image_shape), dtype=tf.float32) / 255.0
    return x, y