ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.02k stars 5.78k forks source link

Ray starts too many workers (and may crash) when using nested remote functions. #3644

Closed robertnishihara closed 5 years ago

robertnishihara commented 5 years ago

This is very similar to the earlier issue #231. One proposed solution was implemented by @stephanie-wang in https://github.com/ray-project/ray/pull/425.

Users sometimes encounter variants of the following bug and have no idea what is going wrong.

Running the following workload requires about 500 workers to be started (to execute all of the g tasks which are blocked in the call to ray.get before the f tasks start getting executed.

import ray
ray.init()

@ray.remote
def f():
    return 1

@ray.remote
def g():
    return sum(ray.get([f.remote() for _ in range(10)]))

ray.get([g.remote() for _ in range(500)])

Workarounds:

Potential Solutions:

In the meantime, we can easily detect that we've started way too many workers and push a warning to the user with a link to some possible workaround.

cc @stephanie-wang @ericl

PMende commented 5 years ago

I'm looking into using Ray to simplify the parallelization of the training/predicting of a multilevel model in Python. I've encountered this warning in what seems like a fairly simple example as shown below:

import ray

ray.init(num_cpus=6)

@ray.remote
class Foo:
    def __init__(self, x):
        self.x = x

    def calc(self, y):
        return self.x + y

    def adjust_x(self, new_x):
        self.x = new_x

class MultiFoo:
    def __init__(self, num):
        self.foos = [Foo.remote(i) for i in range(num)]

    def calc(self, y):
        return ray.get([foo.calc.remote(y) for foo in self.foos])

    def adjust_foos(self, new_xs):
        results = [
          foo.adjust_x.remote(new_x)
          for foo, new_x in zip(self.foos, new_xs)
        ]
        ray.get(results)

Now if I instantiate some examples of MultiFoo, I can get the warning as mention above:

In [4]: multifoos = MultiFoo(6)

In [5]: multifoos = MultiFoo(5)

In [6]: multifoos = MultiFoo(8)

2019-02-18 16:49:57,841 ERROR worker.py:1632 -- WARNING: 18 workers have been started. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.

However, I can sometimes instantiate many (e.g., 10) such MultiFoo(18) objects very quickly, and bind them to the same variable, and have no such warning pop up.

As this is the issue linked in the warning, I'm posting my question here. Am I doing something incorrectly? Is this something I can safely ignore?

robertnishihara commented 5 years ago

@PMende, the reason you might not be seeing the error when you create many MultiFoo objects and bind them to the same variable is that the previous MultiFoo objects are going out of scope and destructing the Foo actors that they created (the destructed actor processes are killed and so there are never too many workers started at one time).

When all of the MultiFoo objects are still in scope, then all of the actors must be kept alive simultaneously. Each actor is a separate process, and there are limits to how many processes you can start simultaneously. For example, if you checkulimit -n`, that is one upper bound. For this reason, you can't start an arbitrarily large number of actors on a single machine. So if you're script is running fine, then yes you can ignore this warning, but if things crash, it will probably be because too many processes were started simultaneously.

virtualluke commented 5 years ago

Is the real downside to launching too many workers the ulimit -n limit? In the example at the beginning of this issue, what is the impact in those 500 calls that aren't doing anything except blocking.

And maybe I have been using terminology incorrectly, when you start ray on a node with some number of cpus (specified in ray start for that node or in init()) you get that many workers on that node, the remote calls to f and then g would be tasks assigned to workers across the ray cluster (assigned amongst (number of ray nodes) * (number cpus per node) number of workers)? Each worker accepts tasks until the local scheduler throttles assignments on the node (is that right?) Even with a small number of workers wouldn't 500 such blocking tasks be of little impact in the load that the local scheduler is using to potentially throttle the scheduling? Please correct me here - I want to really get this straight.

I am making use of custom resources in some similar cases but it gets somewhat cumbersome for custom resource constraints if I want different constraints for 3-4 different remote function calls for example (all constraints which are somewhat artificial throttles I am applying based on some non cpu bound problem that I am using to hard code some sparse scheduling across the cluster). It is similar to the above in that instead of blocking explicitly in the example above I may be effectively blocking because of some large asynchronous or io bound call compared to some cpu bound task.

minimaxir commented 5 years ago

I am hitting this error just by overloading a single CPU thread w/ asyncio (while hacking the example), without nesting remote calls.

import asyncio
import time
import ray
from ray.experimental import async_api

@ray.remote(num_cpus=0.25)
def f():
    print('async')
    time.sleep(3)
    print('async done')
    return 1

ray.init(object_store_memory=100 * 1000000,
redis_max_memory=100 * 1000000, num_cpus=1)
loop = asyncio.get_event_loop()
tasks = [async_api.as_future(f.remote())
         for i in range(4)]
results = loop.run_until_complete(
    asyncio.gather(*tasks))
print(results)
pengzhenghao commented 5 years ago

To the best of my knowledge, their is no such argument that can pass into ray.remote() to control the total number of worker, just like what we do with: tune.run(..., config={'num_worker': ..}

Am I right?

PovelikinRostislav commented 4 years ago

Hi all!

I've updated Ray to the latest version (0.7.7) with #5851 MR to fix this issue, but the problem still here:

2019-12-17 00:04:00,398 WARNING worker.py:1054 -- WARNING: 28 PYTHON workers have been started. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.

My case is a chain of calls, not a nested task submission.

Think of constructing a repeating Map-Reduce pattern and calling get() on a last one reduce. Is it legal usage? :) Or I should prefer calling get() on each reduce/something else?

robertnishihara commented 4 years ago

@PovelikinRostislav, can you share a minimal example that reproduces the issue?

Also, does the application succeed despite printing the warning? Or does it crash?

PovelikinRostislav commented 4 years ago

@robertnishihara, I'm in a progress of minimizing of the reproducer. And I'm sorry for misleading about nested tasks/chain. As I understood from you documentation, call chain literally is a sequence of nested tasks.

There are only warnings, but the amount of created workers is sufficient (5-6 workers instead of 1), because I create exactly 1 worker for one physical node to avoid over-subscription using threaded C++ libraries.

PovelikinRostislav commented 4 years ago

By the way, can I set some hard limit to prohibit creation of such workers?

PovelikinRostislav commented 4 years ago

I still experiencing this issue with workers, even with new versions of Ray, but now - without warning messages.

But I have workarounded this problem by changing the semantics of my functions. Instead of circular dependencies, when I call ray.get(objs) inside remote function, I put all required results as *args and it works for now.

nmayhall-vt commented 4 years ago

I'm hitting this bug it seems, but I don't quite understand the workarounds. My case seems like a simple use case for ray - I need to do many distinct and cpu heavy computations reading from the same data objects. So I put the large objects in the object store via ray.put(). Then my remote function reads this data and does one of the computations. The actual code is quite lengthy, but the main flow looks like:

data1 = DataObject()
data2 = DataObject()
data1_id = ray.put(data1)
data2_id = ray.put(data2)

@ray.remote
def process_data(inp):
    data1_id = inp[0]
    data2_id = inp[1]
    subset_info = inp[2]

    data1 = ray.get(data1_id)
    data2 = ray.get(data2_id)

    #compute on data1 and data2 to get result number
    return result

# fill list of work to do in list called jobs
result_ids = [process_data.remote(i) for i in jobs]
out = ray.get(results_ids)
for result in out:
    print(result)

When I do this I get the error WARNING worker.py:1072 -- WARNING: 36 PYTHON workers have been started. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.

However, if I don't pass data1_id and data2_id1 in as arguments to the function process_data, and I just define them above the function so they are in scope, then the execution works as expected and I don't get the warning message, and I the number of workers never exceeds the number of cores on my computer. Is this the expected behavior?

robertnishihara commented 4 years ago

@nmayhall-vt there is something a little subtle happening here.

If you are doing something like

x_id = ray.put(x)
y_id = ray.put(y)

@ray.remote
def process(data):
    x = ray.get(data[0])
    y = ray.get(data[1])

    # Use x and y

process.remote([x_id, y_id])

Then you should be able to avoid the issue by doing

x_id = ray.put(x)
y_id = ray.put(y)

@ray.remote
def process(x, y):
    # Use x and y

process.remote(x_id, y_id)

The reason is that when you call ray.get inside of a remote function, Ray will treat the task as "not using any resources" until ray.get returns, and so will potentially schedule additional tasks, which may require additional workers to be created.do this,

Does that make sense?

Also, is it giving the warning and then crashing? Or is it giving the warning and then succeeding?

nmayhall-vt commented 4 years ago

@robertnishihara Thanks so much! I think it after giving the warning it would have succeeded, but I can't say for sure because the issue only showed up for a large enough problem that was too big for my computer to handle.

alsuhr-c commented 4 years ago

Is this warning actually a problem if I intend to create a bunch of workers? I'm using ray to parallelize writing something to disk frequently, and don't care about the return value. The calls seem to be succeeding, but the warning message keeps appearing so I'm worried that there is something I'm missing (e.g., maybe some of the processes aren't actually succeeding and I'm just missing them).

Thanks!

robertnishihara commented 4 years ago

@alsuhr-c if it's running successfully, then there's probably no issue, but if you are creating many more worker processes than the number of CPU cores you it's possible you'll run into issues like running out of memory or file descriptors or some other resource.

RafalSkolasinski commented 4 years ago

It is still unclear for me how to control number of nested tasks. If I have a simple code like

import time
import ray

@ray.remote
def a(x):
    return x + ["a"]

@ray.remote
def b(x):
    time.sleep(.1)
    return x + ["b"]

@ray.remote
def c(x):
    return x + ["c"]

and would like to compute a -> b -> c for some list of inputs I'd assume that this is right approach

@ray.remote
def compute(x):
    x = a.remote(x)
    x = b.remote(x)
    x = c.remote(x)
    return ray.get(x)

ray.init()

futures = []
for n in range(500):
    future = compute.remote([n])
    futures.append(future)

results = ray.get(futures)  
ray.shutdown()

but this lead to

2020-08-25 11:21:17,005 WARNING worker.py:1134 -- WARNING: 29 PYTHON workers have been started. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.

that brought me here.

Straight forward solution seems to be removing call to ray.get from remote functions

def compute(x):
    x = a.remote(x)
    x = b.remote(x)
    return c.remote(x)

ray.init()

futures = []
for n in range(500):
    future = compute([n])
    futures.append(future)

results = ray.get(futures)    
ray.shutdown()

but that may not be the ideal for every use case - sometimes you may need output of remote tasks inside remote function.

If using custom resources is the solution as discussed above (that seems less impacting the code than changing data flow) maybe would be good to add here a short snippet how that could solve the issue?

Lewisracing commented 3 years ago

Hi is this issue improved in the latest ray verisions (e.g. 1.1.0)?

Jhonathan-Pedroso commented 3 years ago

This is very similar to the earlier issue #231. One proposed solution was implemented by @stephanie-wang in #425.

Users sometimes encounter variants of the following bug and have no idea what is going wrong.

Running the following workload requires about 500 workers to be started (to execute all of the g tasks which are blocked in the call to ray.get before the f tasks start getting executed.

import ray
ray.init()

@ray.remote
def f():
    return 1

@ray.remote
def g():
    return sum(ray.get([f.remote() for _ in range(10)]))

ray.get([g.remote() for _ in range(500)])

Workarounds:

  • Start fewer g tasks
  • Divide g into two parts, e.g.,

    @ray.remote
    def g_part_a():
      return [f.remote() for _ in range(10)]
    
    @ray.remote
    def g_part_b(*results):
    [>](url)       return sum(results)
    
    intermediate = ray.get([g_part_a.remote() for _ in range(500)])
    ray.get([g_part_b.remote(*ids) for ids in intermediate])
  • Use custom resources to constrain the number of g tasks running concurrently (suggested by @ericl).

Potential Solutions:

  • Make the scheduler prioritize the f tasks over the g tasks (e.g., the strategy in #425 or some sort of LIFO policy.

In the meantime, we can easily detect that we've started way too many workers and push a warning to the user with a link to some possible workaround.

cc @stephanie-wang @ericl

It can be naive but can help other users. I could solve these problem that printed for my case by this logic (Trainer is my actor)- I think can easily be reduced by a decorator:

ray_job_handle

nirn1973 commented 2 years ago

I am getting the same error with Modin[ray]. I do not call a remote function using ray explicitly, but I do initialize and configure ray inside the main/top module, because the program needs extra out-of-core, disk space. Any advice on how to solve this?

ray.init(_plasma_directory=plasma_directory, include_dashboard=False, object_store_memory=20e9, _system_config={ "automatic_object_spilling_enabled": True, "max_io_workers": 4, # More IO workers for remote storage. "min_spilling_size": 50 1024 1024, # Spill at least 100MB at a time. "object_spilling_config": json.dumps( {"type": "filesystem", "params": {"directory_path": object_spilling_path}}, #"c"}}, ), "kill_idle_workers_interval_ms": 5000, "enable_worker_prestart": False, }, ) #object_store_memory=(2*(2**30)))