ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.86k stars 5.76k forks source link

Reducing unnecessary process overhead in practice #8522

Open beasteers opened 4 years ago

beasteers commented 4 years ago

Describe your feature request

Part of the issue with overusing ray.remote is that it adds the overhead of serializing and inter-process communication. But one of the really nice things about ray is that it uses futures that wait for and automatically resolve object ids passed as arguments.

I don't know exactly how it would work, but is there a way to run a lightweight function in a way that doesn't require separate processes but still returns a future? maybe a separate thread or it gets added to the previous/next task skipping serialization? not really sure.

What I'm trying to do is build an execution graph without waiting for intermediate results to return every time I use a function that doesn't need to be executed remotely.

Some random example:

@ray.remote
def func(a):
    time.sleep(2) # some long computation
    return a * 2

# a function that doesn't need parallel processes
# local=True -> minimal overhead - marginally slower than normal python call
@ray.remote(local=True)
def func2(a, b):
    time.sleep(0.001) # some quick computation
    return a + b

id1 = func2.remote(func.remote(5), func.remote(6))
id2 = func2.remote(id1, func.remote(7))
id3 = func.remote(id2)
x = ray.get(id3)

Note: I realize this could be refactored to use one remote function, but I need to maintain the composability of functions.

rkooo567 commented 4 years ago

In that case, isn’t asynio satisfying your requirements?

beasteers commented 4 years ago

That's not a bad thought (I'm familiar with the concept of asyncio, but less so with implementation). Would asyncio work seamlessly in a ray pipeline tho? the goal is to have the heavier parts of the program use ray for multiprocessing and fast serialization (arrow) and have lighter-weight functions mixed in that don't force the preceding chain to execute when it's called.

I'm not sure if that's possible, but if it is straight forward, would you mind showing how that might work on the toy example above?

rkooo567 commented 4 years ago

I see. I understood your feature request. I believe it is possible to achieve this using asyncio, but it wouldn't be too straightforward. (The idea is to make func2 async function and run the whole code block you wrote as a future. It'll be easier if you use sth like async web server such as aiohttp).

I understood you want to build a whole dependency graph and make ray execute them while you make some lightweight functions running without a big overhead. I saw the value, but I believe this is not easy to implement in our current architecture (it might take some time which we are not sure we have enough bandwidth to work on).

@edoakes Did you see this sort of feature request before?

beasteers commented 4 years ago

As an update - I managed to add this bare-bones, but functional drop-in replacement for ray.remote (for actors). Obviously there are a few things to be improved on

But I wonder if something like this could be useful in the ray core? for simple tasks that would only add bloat if added as their own processes.

class LocalExecutor:
    '''Mimic ray's interface, but with threads.'''
    n_workers = 1
    def __init__(self, *a, **kw):
        self.__pool = concurrent.futures.ThreadPoolExecutor(self.n_workers)
        super().__init__(*a, **kw)

    def __str__(self):
        return '<LocalActor({})>'.format(super().__str__())

    def __getattribute__(self, name):
        value = super().__getattribute__(name)
        if not name.startswith('_LocalExecutor') and inspect.ismethod(value):
            # XXX: how to cache this or do this at the beginning ?
            return _PoolMethod(value, self.__pool)
        return value

    @classmethod
    def remote(cls, *a, **kw):
        return cls(*a, **kw)

class _PoolMethod:
    '''Wraps a method with a `.remote()` method.'''
    n_workers = 1
    _global_pool = concurrent.futures.ThreadPoolExecutor(n_workers)

    def __init__(self, func, pool=None, use_global=True):
        self._func = func
        self._pool = pool if pool is not None else (
            self._global_pool if use_global else
            concurrent.futures.ThreadPoolExecutor(self.n_workers)
        )

    def __call__(self, *a, **kw):
        return self._func(*a, **kw)

    def remote(self, *a, **kw):
        return asyncio.wrap_future(self._pool.submit(self._func, *a, **kw))

    def __getattr__(self, name):
        return getattr(self._func, name)

def ray_local(cls, **kw):
    return (
        type(cls.__name__, (LocalExecutor, cls), kw)
        if inspect.isclass(cls) else _PoolMethod(cls))
richardliaw commented 4 years ago

hmm yeah this is cool! It works with actors? would it be hard to get working with tasks?

beasteers commented 4 years ago

Yes! it works with actors like this:

class SomeObject:
    def __init__(self, asdf=5):
        self.asdf = asdf

    def get_asdf(self):
        return self.asdf

def some_task():
    return 5

# define a ray remote worker and a local thread worker class
RemoteObject = ray.remote(SomeObject)
LocalObject = ray_local(SomeObject)
local_task = ray_local(some_task)

async def main():
    # instantiate
    remote_actor = RemoteObject.remote()
    local_actor = LocalObject.remote()

    assert await remote_actor.get_asdf.remote() == 5
    assert await local_actor.get_asdf.remote() == 5
    assert await local_task.remote() == 5

asyncio.get_event_loop().run_until_complete(main())

The only thing with tasks is that I don't know how to manage the thread pool instance.

How does it work with process pools currently? Cuz it should probably just mirror that. The ray source code is a bit hard to navigate so I'm not sure how it's implemented.

Is there an existing global pool instance to access? Or does each task have its own pool? Or do I create a new global thread pool instance for tasks? From my understanding, only actors have a dedicated process and remote functions can get passed around which is why I'm thinking of a global pool, but I may be wrong idk.

Edit: I just updated the code in the previous comment and this one to handle tasks, but we'll have to sort out how to handle the pool instance.

One thing that I just found out is that local_actor.remote() doesn't work with ray.get because I'm using asyncio.wrap_future and not something from ray.*. I had been using async, await so I hadn't noticed. So that would need to be addressed, preferably without serializing and adding the object to the object store because that defeats the purpose of using threads.