ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.37k stars 5.65k forks source link

ray.remote actor function can erroneously return previously cached objects on subsequent calls when using multiple processes #5986

Closed damianj closed 4 years ago

damianj commented 4 years ago

System information

Describe the problem

I am trying to serve a pre-trained model using Ray and Sanic (uvloop based Python server using multiple workers hence why I used Ray). I have it running, but I have this weird issue where the prediction probabilities that are returned are sometimes for past inputs. I can query the same input and get different past probabilities that were predicted. It seems to happen randomly, I can't really tell if there is a pattern to it. This doesn't happen when I run it without Ray, but I also can't use multiple workers without Ray because then I start getting "Unexpected CUDA error: initialization error" which Ray helped solve.

I tried to follow https://ray.readthedocs.io/en/latest/using-ray-with-tensorflow.html for the model initialization. The overall structure of my code is structured as follows:

def load_model():
    #load the graph and create session
    # return model, session, etc.

@ray.remote(num_gpus=1)
class MyTrainedModel:
    def __init__(self):
        model, sess, etc. = load_model()

    def query_model(self, input):
        # do some pre-processing on the input
        # some other stuff gets called, but eventually the 
        # following is what gets the prediction
        with sess.as_default():
            return sess.run(self.y_pred, feed_dict=input)

# Then in some other code I have something like:
class MyAvailableModels:
    def __init__(self):
        self.my_trained_model = MyTrainedModel.remote()

# This then gets called like
my_avail_models = MyAvailableModels()
res = ray.get(my_avail_models.my_trained_model.query_model.remote(input))

# Later I also tried the experimental async_api
# I need to explicitly init the async_api since I already have
# a loop (Sanic's uvloop) running by the time this gets called
if async_api.handler is None:
    await async_api._async_init()
future = async_api.as_future(my_avail_models.my_trained_model.query_model.remote(input))
result = await future

I've oversimplified it as there are many more moving parts, but I think this is the relevant portion of it, at least in terms of how it's integrated with Ray.

Source code / logs

Some info I got using ray.objects().

Input:  This is a test.
Return value:  [0.00278279185295105]
Returned object by remote func:  ObjectID(7c3ddc20ab827d58f4150100000000c001000000)
ray.objects():  {ObjectID(7c3ddc20ab827d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}}
----End of Call---

# This answer is wrong, using previously cached object
Input:  This is another test.
Return value:  [0.00278279185295105]
Returned object by remote func:  ObjectID(7c3ddc20ab827d58f4150100000000c001000000)
ray.objects():  {ObjectID(7c3ddc20ab827d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}}
----End of Call---

Input:  This is another test.
Return value:  [0.0002527501783333719]
Returned object by remote func:  ObjectID(1a2a250119807d58f4150100000000c001000000)
ray.objects():  {ObjectID(7c3ddc20ab827d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(1a2a250119807d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}}
----End of Call---

# This answer is wrong, using previously cached object
Input:  This is a different test.
Return value:  [0.00278279185295105]
Returned object by remote func:  ObjectID(7c3ddc20ab827d58f4150100000000c001000000)
ray.objects():  {ObjectID(7c3ddc20ab827d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(1a2a250119807d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}}
----End of Call---

# This answer is wrong, using previously cached object
Input:  This is a different test.
Return value:  [0.0002527501783333719]
Returned object by remote func:  ObjectID(1a2a250119807d58f4150100000000c001000000)
ray.objects():  {ObjectID(7c3ddc20ab827d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(1a2a250119807d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}}
----End of Call---

Input:  This is a different test.
Return value:  [0.0021787371952086687]
Returned object by remote func:  ObjectID(2223f7ffc3b47d58f4150100000000c001000000)
ray.objects():  {ObjectID(7c3ddc20ab827d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(2223f7ffc3b47d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(1a2a250119807d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}}
----End of Call---

# This answer is wrong, using previously cached object
Input:  This is a different test.
Return value:  [0.0002527501783333719]
Returned object by remote func:  ObjectID(1a2a250119807d58f4150100000000c001000000)
ray.objects():  {ObjectID(7c3ddc20ab827d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(2223f7ffc3b47d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(1a2a250119807d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}}
----End of Call---

Input:  This is a different test.
Return value:  [0.0021787371952086687]
Returned object by remote func:  ObjectID(2223f7ffc3b47d58f4150100000000c001000000)
ray.objects():  {ObjectID(7c3ddc20ab827d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(2223f7ffc3b47d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(1a2a250119807d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}}
----End of Call---

Input:  This is a different test.
Return value:  [0.0021787371952086687]
Returned object by remote func:  ObjectID(1788606d1c6a7d58f4150100000000c001000000)
ray.objects():  {ObjectID(7c3ddc20ab827d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(2223f7ffc3b47d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(1a2a250119807d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(1788606d1c6a7d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}}
----End of Call---

Input:  This is a different test.
Return value:  [0.0021787371952086687]
Returned object by remote func:  ObjectID(54158c91583e7d58f4150100000000c001000000)
ray.objects():  {ObjectID(7c3ddc20ab827d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(2223f7ffc3b47d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(54158c91583e7d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(1a2a250119807d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}, ObjectID(1788606d1c6a7d58f4150100000000c001000000): {'DataSize': 704, 'Manager': b'\x18\xfblJ\xcf\xee\x1f\xf3{\xf8hh\xf4\xdf\xebs\xa9\x92(4'}}
----End of Call---

As far as I can tell when I get the wrong answer it's using cached objects and when I get the right answer it is appending a new object to the objects table? Is there a way to flush the objects table, I suppose this provides some speed-up, but the model prediction isn't slow and I'd rather not have it if it impacts the integrity of the answers I get back. Maybe I'm mis-understanding the whole objects thing though.

damianj commented 4 years ago

Any idea as to what I'm doing wrong, am I mis-using Ray, or perhaps this is a bug?

damianj commented 4 years ago

Didn't mean to close the issue, I was trying to comment so here goes again..

I have a minimal example that can reproduce the issue:

from sanic import Sanic
from sanic.response import json
import ray
import os
import random

app = Sanic(__name__)

def load_outside_actor():
    import tensorflow as tf

    tf.compat.v1.disable_eager_execution()
    tf.compat.v1.disable_resource_variables()
    tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
    os.environ['CUDA_VISIBLE_DEVICES'] = "0"

    config = tf.compat.v1.ConfigProto()
    config.gpu_options.allow_growth = True
    a = tf.compat.v1.constant(5.0)
    b = tf.compat.v1.constant(6.0)
    c = a * b

    sess = tf.compat.v1.Session(graph=c.graph, config=config)
    sess.run(tf.compat.v1.global_variables_initializer())

    return c, sess, tf

@ray.remote(num_gpus=1)
class Test:
    def __init__(self):
        self.c, self.sess, self.tf = load_outside_actor()

    @ray.method(num_return_vals=1)
    def test_it(self, rand_num=None):
        with self.sess.as_default():
            return float(self.sess.run(self.c * self.tf.compat.v1.to_float(rand_num)))

@app.route('/')
async def index(request):
    random_integer = random.randint(2, 1001)
    ray_result = ray.get(my_test.test_it.remote(random_integer))
    expected_result = 5.0 * 6.0 * random_integer

    if expected_result != ray_result:
        print("Error detected, ray_result != to expected_result.")
    print("Output should be: ", 5.0 * 6.0 * random_integer)
    print("Ray method returned: ", ray_result, "\n")

    return json({'tf result': ray_result})

ray.init(ignore_reinit_error=True)
my_test = Test.remote()

if __name__ == "__main__":
    sanic_config = {
        "host": "127.0.0.1",
        "port": 5000,
        "workers": 3,
        "debug": False,
        "access_log": False
    }

    app.run(**sanic_config)

To reproduce the behavior I have described you just need to go to http://127.0.0.1:5000/ after starting the application to trigger the remote method. Then wait about 30 seconds-ish and reload the page to trigger the method again. Do the whole waiting and reloading until the error occurs, shouldn't take more than a handful of tries for it to surface.

I'm also fairly certain this is a multiprocessing issue as when I set the number of workers to 1 I cannot reproduce the behavior.

EDIT: After some more digging https://github.com/ray-project/ray/blob/releases/0.7.6/python/ray/worker.py#L2114 is where I can observe the wrong value getting returned via debugging. The actual issue I guess is upstream with https://github.com/ray-project/ray/blob/releases/0.7.6/python/ray/actor.py#L520 since that is what seems to be setting the object_ids parameter of the function in the first link.

EDIT 2: I can verify that it's trying to use previously cached objects, because when I delete them after getting the result with ray.get() via:

ray_global_worker = ray.worker.get_global_worker()
ray_global_worker.core_worker.free_objects([ray_object], False, True)

, then an error gets thrown if it tries to use that object that it previously cached. When it doesn't try and use cached objects things proceed normally.

EDIT 3: I've tried instantiating actors in each worker (i.e. process) and setting num_gpus=1.0/number_of_workers, the issue persists.