ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.95k stars 5.58k forks source link

How to throttle process to avoid "UnreconstructableError" #6892

Open monney opened 4 years ago

monney commented 4 years ago

Python: 3.7.4 Ray: 0.8.0 OS: Ubuntu

import ray
import numpy as np
import time

ray.init()
@ray.remote
class Model:
    def classify(self,x):
        time.sleep(0.005)

@ray.remote
class Pool:
    def __init__(self,k=8):
        self.workers = [Model.remote() for i in range(k)]
        self.i=0
    def classifiy(self,x):
        self.workers[self.i].classify.remote(x)
        self.i = (self.i+1)%len(self.workers)

pool = Pool.remote()

x=np.random.randn(20000)
for i in range(200000):
    pool.classifiy.remote(x)

I'm building a ray inference pool similar to the mock code above. Basically tasks are being scheduled too quickly, tensors build up in the memory store, and we get an Unreconstructable Error due to LRU Garbage Collection.

Is there a way I can detect this error is going to happen, and stop scheduling jobs so that the code does not error out?

When it errors out, it recovers very slowly, and isn't able to process new things quickly, so I'd just like to avoid scheduling new jobs until a certain threshold, either on memory free, or number of jobs left to process.

The available resources function does not work, ray.tasks() does not work. I can manually throttle by putting a time.sleep() in the for loop, but I'd like to throttle adaptively based on how resources look.

Thank you

stephanie-wang commented 4 years ago

You can try using ray.wait (docs) to see how many tasks have completed so far. Something like this might work:

x = ray.put(np.random.randn(20000))  # Use ray.put to avoid repeating serialization.
queue = []
for i in range(200000):
    if len(queue) == CAPACITY:
        _, queue = ray.wait(queue, num_returns=1)  # Wait for at least 1 object to finish.
    queue.append(pool.classify.remote(x))
# Wait for all remaining tasks to finish.
ray.wait(num_returns=len(queue))

You can also try using a ParallelIterator (docs), which will automatically submit tasks for you.