PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.04k stars 1.57k forks source link

Support Ray as an Executor Backend #3963

Closed jcrist closed 2 years ago

jcrist commented 3 years ago

Prefect currently relies on Dask for distributed execution. Another tool users may want to use in this space is Ray. Ray makes a different set of tradeoffs than dask, but could equally be used for running prefect. This would require the creation of a new RayExecutor executor class to mirror the existing DaskExecutor.

Note that a RayExecutor is unlikely to perform better than the existing DaskExecutor for Prefect workflows. Prefect tasks add a certain amount of execution overhead, in practice Prefect is the bottleneck in execution, not Dask. "Performance improvements" are not a motivating use case for adding a RayExecutor class, they are likely to be negligible if any.

A motivating use case here is making Prefect easier to deploy for users already using Ray. In this case, deploying on an existing Ray cluster may be much simpler for them than spinning up a separate Dask cluster to run on.

A few (mild) drawbacks to adding Ray support here:

jcrist commented 3 years ago

Given the above, I'm unlikely to work on this without hearing from several Prefect users that having a Ray backend would be useful for them. It's something we're interested in, but unlikely to prioritize as a dev team in the near future without input from the Prefect community.

Hoeze commented 3 years ago

Coming from Dask-distributed, I just started using Ray, but I'm sure that Ray could have some very nice side effects for Prefect:

My vision would be having a single large Ray cluster running many Prefect flows the same time. Like snakemake + slurm, but with a dynamic task graph allowing to add tasks at any time.

iovsn commented 3 years ago

As naive users of both dask and ray, with no deep understanding of either, our experience is the following... We started with dask but moved to ray. With ray we could take an existing piece of code and scale its execution without much effort and modification, where with dask we always had some issues. One example was the default profiling settings causing workers to crash because of memory usage greatly increasing through time (this stackoverflow issue saved us). It's hard for me to understand why a default setting is such that it causes this. It took us quite some time to find the cause, as we thought our code is the problem. On the other hand, with ray it just worked fine with minimal memory usage. And we had a few similar experiences with ray and dask. Since we started using Prefect we again get stuck with trying to understand dask quite often. As inexperienced users, we'd much rather just have RayExecutor from this simple and naive perspective.

Hoeze commented 3 years ago

@iovsn you could try to use the Ray Dask-executor: https://docs.ray.io/en/master/dask-on-ray.html

For my normal Dask-array/Dask-dataframe/Dask-delayed projects, this magically reduced my memory usage. See also https://github.com/PrefectHQ/prefect/discussions/3967#discussioncomment-568078

cicdw commented 3 years ago

@Hoeze were you able to run on Ray with the current Prefect executors? If so would you mind sharing some code for configuring this, as I think it would benefit others quite a bit

Hoeze commented 3 years ago

Ah, that's a misunderstanding @cicdw, I never tried it myself :sweat_smile:

As written in #3967, I'm directly using ray now as this gives me more freedom in resource management. I would also like to know how Prefect performs with dask-on-ray.

iovsn commented 3 years ago

@Hoeze, thanks for the suggestion.

I tried following the most basic example described in https://docs.ray.io/en/master/dask-on-ray.html, but I can't even do the necessary import:

import ray
from ray.util.dask import ray_dask_get

I get an exception:

<ipython-input-1-e0907ba5550b> in <module>
      1 import ray
----> 2 from ray.util.dask import ray_dask_get

~/env/main/lib/python3.7/site-packages/ray/util/dask/__init__.py in <module>
----> 1 from .scheduler import ray_dask_get, ray_dask_get_sync
      2 from .callbacks import (
      3     RayDaskCallback,
      4     local_ray_callbacks,
      5     unpack_ray_callbacks,

~/env/main/lib/python3.7/site-packages/ray/util/dask/scheduler.py in <module>
      7 
      8 from dask.core import istask, ishashable, _execute_task
----> 9 from dask.local import get_async, apply_sync
     10 from dask.system import CPU_COUNT
     11 from dask.threaded import pack_exception, _thread_get_id

ImportError: cannot import name 'apply_sync' from 'dask.local' (/home/iovsn/env/main/lib/python3.7/site-packages/dask/local.py)
Hoeze commented 3 years ago

@iovsn That seems like a simple version mismatch. How did you install dask and ray? I installed mine via:

conda install -c conda-forge dask
pip install ray
iovsn commented 3 years ago

@Hoeze, I installed both packages with pip. I tried the way you did and the import now works. Do you know the reason behind the need to install the packages in such mixed way of using conda+pip?

I didn't find the time to proceed from here though, I'll post an update when I do.

Hoeze commented 3 years ago

This gets a bit off-topic now; nevertheless: @iovsn No, I don't know but conda is much better for dependency resolution than pip. So, as a rule of thumb always try to use conda where possible.

If you are concerned about the slow dependency resolution of conda, you can also try mamba. Mamba is like 10-100x faster than conda. It does not yet support all conda features but you can always first try mamba and fall back to conda if your command does not work.

clarkzinzow commented 3 years ago

@iovsn That failure was caused by Dask-on-Ray relying on some internal Dask APIs that broke in Dask 2021.04.0 (the most recent release); this has been fixed in Ray master. There are currently two options in which Ray + Dask works:

  1. Use the latest Ray release (1.2) and a Dask version before 2021.04.0, such as 2021.03.1: pip install -U ray dask==2021.03.1
  2. Use latest Ray master and any recent Dask version, including 2021.04.0: pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl dask

For installing the correct nightly wheel for your version of Python and your OS, see the nightly installation instructions.

Also, we should be releasing Ray 1.3 in the next week or so, at which point you won't have to install latest Ray master and can instead just pip install -U ray, which will work with any recent Dask version.

iovsn commented 3 years ago

Thanks for all the help, I got it working! (with a hack though)

I followed suggestion 1. from @clarkzinzow, so importing of the modules worked fine.

I then tired to pass a custom scheduler to prefect.LocalDaskExecutor like:

import ray
from ray.util.dask import ray_dask_get
ray.init()
executor = LocalDaskExecutor(scheduler=ray_dask_get)

This didn't work as scheduler argument has to be a string. So I tried to bypass the LocalDaskExecutor._normalize_scheduler method:

import ray
from ray.util.dask import ray_dask_get
ray.init()
executor = LocalDaskExecutor(ray_dask_get)
executor.scheduler = ray_dask_get

This didn't get anywhere, execution was just waiting to begin and never started. So I just changed the LocalDaskExecutor.__init__ method to:

def __init__(self, scheduler: str = "threads", ray_scheduler=None, **kwargs: Any):
    self.scheduler = self._normalize_scheduler(scheduler)
    self.dask_config = kwargs
    self._pool = None  # type: Optional[multiprocessing.pool.Pool]
    self.ray_scheduler = ray_scheduler
    super().__init__()

And also changed the last few lines of LocalDaskExecutor.wait to:


scheduler = self.scheduler if self.ray_scheduler is None else self.ray_scheduler

with patch(), dask.config.set(config):
    return dask.compute(
        futures, scheduler=scheduler, pool=self._pool, optimize_graph=False
    )[0]

With this hack, our workflows run on ray like a charm. With a recent dask update we started getting a bunch of garbage collection warnings (like the ones described here) and workflows needed more initial time to start actually running anything (I didn't investigate where they get stuck). This was the reason why I took the time to try dask-on-ray again and the same workflows that were problematic on dask start executing immediately on ray, with no memory issues or complaints whatsoever. The old "it just works" feeling of ray we were missing.

By some guidance I can do a proper PR, as what I did was just a quick modification to check if it will run at all. I didn't go into the logic of how the initial scheduler argument is passed to LocalDaskExecutor.__init__ and what happens to it on the way to LocalDaskExecutor.wait. Probably LocalDaskExecutor is not even the right place for doing this, and a new RayExecutor class should be created, which automatically sets ray.util.dask.ray_dask_get as a dask scheduler? I am also not familiar with the prefect codebase, so someone more familiar would find a better approach then myself, and also a quick one, since it took me only a few lines of code changes.

nikhil-sthalekar commented 2 years ago

Just wanted to add a bit to the conversation as a Ray user who is utilizing Prefect for setting up and scheduling workflows. I'm using ShellTasks for everything.

The Ray CLI is straightforward to use so this works well for me, though it does feel a bit clunky. Right now, if I wanted to run Dask on Ray, I would probably keep this format of a workflow of python scripts that can be executed on a cluster.

All that being said, having a Ray executor would open up Prefect to better integrate with other data processing libraries like Modin (Pandas on Ray), RayDP(Spark on Ray), and Mars, as well as ML workflows using Ray Tune and RLlib.

zanieb commented 2 years ago

πŸ‘ we're investigating a Ray executor for Orion. We will not be implementing one for Prefect 1.0.

nikhil-sthalekar commented 2 years ago

Awesome! I just started looking into Orion and I'm super excited about it!

nikhil-sthalekar commented 2 years ago

@madkinsz I have a question about the Ray executor that you are investigating for Orion. It seems like the Ray team is integrating a lower level Workflow suite that directly integrates with Ray. Any chance Orion would have a way to integrate tightly with this? The Ray team does not seem to have a scheduler in the works so it would be awesome to have some way to utilize Prefects scheduling capabilities, UI, etc, either directly with the native workflows or have a way for the prefect tasks and flows to directly take advantage of these ray workflow steps and workflow definitions. From a cursory look, it seems like Orion's Tasks, Flows, and States, mirror the Ray Workflow definitions of Steps, Workflows, and Workflow State.

zanieb commented 2 years ago

@nikhil-sthalekar the short answer is: probably not. Integrating tightly with another workflow system is generally going to cause more pain than it's worth. Their workflow manager is also still quite basic and in an alpha stage. The first iteration of a Ray executor from us would likely work the same as all of our other executors: just managing execution of a single task.

spdcoding commented 2 years ago

πŸ‘ we're investigating a Ray executor for Orion. We will not be implementing one for Prefect 1.0.

@madkinsz Is there a rough schedule (timeline) for supporting ray as executor? Thx

kvnkho commented 2 years ago

The Ray TaskRunner was added in Orion (Prefect 2.0) already. You can find the docs here

dynamicwebpaige commented 2 years ago

@madkinsz It's very exciting to see Ray TaskRunner support in Orion! Thank you for doing the heavy lifting to add it, and please let us know if you run into any snags. πŸ˜„

Would you, or anyone from the PrefectHQ engineering team, have any interest in speaking about adding Ray as an executor backend at the upcoming Ray Summit, or co-authoring a blogpost about the migration process?