Closed Tixxx closed 2 years ago
@richardliaw
import sys
sys.path.append('.') # adds local folder
import math
import horovod.tensorflow.keras as hvd
import horovod.spark.keras as hvd_keras
import functools
from functools import partial
import tensorflow as tf
import horovod.tensorflow.keras as hvd
import os
import glob
import tensorflow.keras.backend as K
import ray
import argparse
class LocalRayContext():
'''
Keep this non-client ray context here for two reasons:
1. in certain circumstance, this way to access ray is faster than ray client
2. without client layer, ray access is simpler and less port conflict risk, easier debug
'''
def __init__(self):
# Start the Ray cluster or attach to an existing Ray cluster
# will generate such error if shutdown before new ray init.
# (pid=raylet) [2021-02-02 04:00:06,190 E 9518 9518] process.cc:498:
# Failed to kill process 9577 with error system:3: No such process
# ray.shutdown()
# ray.init()
# therefore instead we use ignore_reinit_error to avoid shutdown
try:
ray.init(address='auto', ignore_reinit_error=True)
except Exception as e:
print(f'LocalRayContext __init__ ray.init address=auto failed with error: {e}')
try:
ray.init(ignore_reinit_error=True)
except Exception as e2:
print(f'LocalRayContext __init__ ray.init failed again with error: {e2}')
def run(self, fn, args=[], kwargs={}, env=None):
@ray.remote(max_calls=1)
def run_ray_remote(func):
return func(None)
ans = [run_ray_remote.remote(lambda w: fn(*args, **kwargs))]
return ray.get(ans)
def shutdown(self):
ray.shutdown()
class LocalRayDLContext(LocalRayContext):
def __init__(self, num_workers=1, optimizer='horovod'):
super().__init__()
if optimizer != 'horovod':
raise Exception(
'At the moment, synchronous SGD based on horovod '
'is the only optimizer supported'
)
from horovod.ray import RayExecutor
self.num_workers, self.cpus_per_worker = num_workers, 1
self.ray_executor = RayExecutor(
RayExecutor.create_settings(timeout_s=30),
num_workers=self.num_workers,
cpus_per_worker=self.cpus_per_worker,
use_gpu=False,
gpus_per_worker=0,
)
self.ray_executor.start()
print(
f'LocalRayDLContext initialized with 1 host and {self.num_workers} slot(s)')
def num_processes(self):
return self.num_workers * self.cpus_per_worker
def run(self, fn, args=[], kwargs={}, env=None):
"""Executes the provided function on all workers.
Args:
fn: Target function that can be executed with arbitrary
args and keyword arguments.
args: List of arguments to be passed into the target function.
kwargs: Dictionary of keyword arguments to be
passed into the target function.
Returns:
Deserialized return values from the target function.
"""
print(f'env is {env}, which will not be used in local ray context')
return self.ray_executor.run(fn, args, kwargs)
def shutdown(self):
self.ray_executor.shutdown()
super().shutdown()
print('local ray context shutdown')
# ================ SUBCLASS NETWORK FOR TRAINING =====================
import horovod.spark.keras.util as keras_util
worker_count=4
def remote_trainer():
def keras_fn():
def fn():
import tensorflow.keras as tf_keras
return tf_keras
return fn
def horovod_fn():
def fn():
import horovod.tensorflow.keras as hvd
return hvd
return fn
pin_gpu = hvd_keras.remote._pin_gpu_fn()
get_keras = keras_fn()
get_horovod = horovod_fn()
def train_function(input):
k = get_keras()
float_type = tf.keras.backend.floatx()
#float_type = K.floatx() -> this works
print(f"horovod.spark.keras.util: using keras in tensorflow=={tf.__version__}")
print(float_type)
hvd = get_horovod()
hvd.init()
pin_gpu(hvd, tf, k)
y_true = tf.random.uniform([2, 3])
y_pred = tf.random.uniform([2, 3])
ce = K.binary_crossentropy(y_true, y_pred, from_logits=False)
print(ce)
return 0
return train_function
################Use ray################
def get_local_ray_ctx():
print("use not ray client based horovod ray executor")
return LocalRayDLContext(worker_count)
backend = get_local_ray_ctx()
trainer = remote_trainer()
handle = backend.run(trainer, args=([2]))
print("######finish training")
I can't reproduce the error, it works well for me.
@Tixxx can you try making this change:
get_keras = keras_fn()
get_horovod = horovod_fn()
def train_function(input):
k = get_keras()
+ import tensorflow as tf
float_type = tf.keras.backend.floatx()
#float_type = K.floatx() -> this works
It also fails for me with the same error. The only way to make it work for me is to import tensorflow.keras beforehand. keras module in tensorflow is lazily loaded, does that impat serialization in any way?
ah yeah that should work. i suspect theres some hidden global state that keras manipulates.
We got another occurrence where it's now complaining about verion attribute missing from tensorflow, it repros for me on both linux and mac with this train_function:
def train_function(input):
print(f"using keras in tensorflow=={tf.__version__}")
y_true = tf.random.uniform([2, 3])
y_pred = tf.random.uniform([2, 3])
ce = tf.keras.losses.binary_crossentropy(y_true, y_pred, from_logits=False)
print(ce)
return 0
stack trace:
File "/Users/tix/Uber/ml-code/env/py369/lib/python3.6/site-packages/horovod/ray/runner.py", line 507, in run
return ray.get(self.run_remote(fn, args, kwargs))
File "/Users/tix/Uber/ml-code/env/py369/lib/python3.6/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
return func(*args, *kwargs)
File "/Users/tix/Uber/ml-code/env/py369/lib/python3.6/site-packages/ray/worker.py", line 1624, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(AttributeError): ray::BaseHorovodWorker.execute() (pid=12749, ip=127.0.0.1, repr=<horovod.ray.worker.BaseHorovodWorker object at 0x7fb8e84457f0>)
File "/Users/tix/Uber/ml-code/env/py369/lib/python3.6/site-packages/horovod/ray/runner.py", line 529, in
I think we really need to get a repro to have a proper root cause since this happens randomly every time we add new layers or losses. Could you try this on the uber laptop to see if it repros? Package versions are still the same as the ones in the dependency section.
TJ, I think this is a known issue with Ray.
The way to fix it is to make sure tensorflow is always imported in the train_function.
For example, try “import tensorflow as tf” INSIDE train_function before the tf.version check.
Please let me know if that works.
On Wed, Feb 9, 2022 at 10:25 AM TJ Xu @.***> wrote:
We got another occurrence where it's now complaining about verion attribute missing from tensorflow, it repros for me on both linux and mac with this train_function: def train_function(input): print(f"using keras in tensorflow=={tf.version}") ytrue = tf.random.uniform([2, 3]) ypred = tf.random.uniform([2, 3]) ce = tf_.keras.losses.binary_crossentropy(y_true, y_pred, from_logits=False) print(ce) return 0
stack trace: File "/Users/tix/Uber/ml-code/env/py369/lib/python3.6/site-packages/horovod/ray/runner.py", line 507, in run return ray.get(self.run_remote(fn, args, kwargs)) File "/Users/tix/Uber/ml-code/env/py369/lib/python3.6/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper return func(*args, *kwargs) File "/Users/tix/Uber/ml-code/env/py369/lib/python3.6/site-packages/ray/worker.py", line 1624, in get raise value.as_instanceof_cause() ray.exceptions.RayTaskError(AttributeError): ray::BaseHorovodWorker.execute() (pid=12749, ip=127.0.0.1, repr=<horovod.ray.worker.BaseHorovodWorker object at 0x7fb8e84457f0>) File "/Users/tix/Uber/ml-code/env/py369/lib/python3.6/site-packages/horovod/ray/runner.py", line 529, in worker.execute.remote(lambda w: fn(args, *kwargs)) File "ray_repro.py", line 107, in train_function print(f"using keras in tensorflow=={tf.version}") File "/Users/tix/Uber/ml-code/env/py369/lib/python3.6/site-packages/tensorflow_core/python/util/module_wrapper.py", line 193, in getattr attr = getattr(self._tfmw_wrapped_module, name) AttributeError: module 'tensorflow' has no attribute 'version*'
I think we really need to get a repro to have a proper root cause since this happens randomly every time we add new layers or losses. Could you try this on the uber laptop to see if it repros? Package versions are still the same as the ones in the dependency section.
— Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/22195#issuecomment-1034064595, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABCRZZISTHFCF5UCGOV3MHDU2KWQHANCNFSM5NY6ZOOQ . You are receiving this because you were mentioned.Message ID: @.***>
Yea this works for me. It's just a bit difficult to enforce other users to do the same when they develop models on our platform. Do you know if this known issue with tf is tracked anywhere?
Search before asking
Ray Component
Ray Core
What happened + What you expected to happen
Symptoms: Running model training using remote ray executors fails with strange errors like "KeyError" or "AttributeError" when the object indeed has the key or attribute. Example stack trace:
Expected behavior: Running the repro script should pass without any error.
Versions / Dependencies
python: 3.6.9 OS: Ubuntu 16
Reproduction script
repro
Anything else
No response
Are you willing to submit a PR?