PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.82k stars 1.55k forks source link

Support for user-defined Executors #4213

Closed jacksund closed 1 year ago

jacksund commented 3 years ago

Current behavior

Executors packaged with Prefect are currently limited to LocalExecutor (for testing) and DaskExecutor. Wrapping a new Executor is especially difficult/buggy because FlowRunner assumes some functionality provided by Dask (e.g., Dask resolves futures implicitly without requiring an explicit wait)

Proposed behavior

Build FlowRunner such that no advanced functionality is assumed in the Executor. As long as the class and its methods follow the standard of concurrent.futures.Executor and concurrent.futures.Future, then the user should be able to use it in production with flow.run(executor= CustomExecutor())

Example

note: this is my specific use-case and why I need a custom Executor in Prefect

First, some context...

In production, Dask workers have outgoing and incoming signals. This means that any machine where a worker is started, signals must be allowed to go in and out on the assigned port. The same goes for the scheduler -- signals go in and out. For managed HPC clusters, this can be tricky as admins follow different rules on port-opening for security. Typically, computing clusters allow any outgoing signals, but restrict all incoming. This is standard for academic clusters, and only (for the most part) high-security government clusters require admin permission for outgoing signals.

So let's wrap an Exectutor that follows part of Prefect's "hybrid" approach! Here, our Executor workers follow a key principle of Prefect Agents -- they must only send requests out to the scheduler/server, and never receive requests themselves.

An Executor that relies entirely on outgoing signals is ideal for users that use a variety of shared resources and clusters. Here, there's no need to use something like dask-scheduler --port 8786 and dask-worker --worker-port 3000:3026 while ensuring all ports have firewalls properly configured for incoming requests-- which often times you need to get an admin's permission for. Alternatively when only using outgoing signals, setting up a new worker can be as easy as pinging a website.

I made a bare-bones Executor that does this and follows concurrent.futures class structure. While the executor works as intended across different machines and clusters, it is not compatible with assumptions made by Prefect's FlowRunner:

# note, "flow" here is any Prefect workflow

# this is my custom "hybrid" executor with workers running elsewhere
custom_executor = CustomExecutor()

# this works as intended, and flows complete successfully
# task.run and generic functions/methods work as well
future = custom_executor.submit(flow.run, *my_args, **my_kwargs)
state = future.result()  

# this fails due to functionality assumptions made in FlowRunner
state = flow.run(executor=custom_executor)
# Unexpected error occured in FlowRunner: AttributeError("'CustomFuture' object has no attribute 'is_mapped'")
zanieb commented 3 years ago

Thanks for the well written issue! See also https://github.com/PrefectHQ/prefect/issues/3963

jcrist commented 3 years ago

Thanks for opening this. Simplifying the executor interface and supporting user-defined executors is on our roadmap, but it's not that straightforward. Prefect currently relies fairly heavily on dask for handling asynchronous execution cues. I want to bring these into prefect itself to both simplify the executor interface and also allow us to tailor execution more to our needs (right now we hack around some differences between us and dask). I don't think the interface we settle on will look like concurrent,futures (the concurrent.futures interface doesn't make it as easy to avoid moving data from remote processes or optimize execution pipelines), but we do want to ensure that the interface is well defined and simple to implement.

In short, this is something we want to do, it's on our roadmap, but it may not happen quickly.

jcrist commented 3 years ago

With regards to your specific issue, I'm surprised that dask doesn't work for you. Lots of dask users deploy dask on HPC systems - in my experience worker <-> worker communications haven't had issues (I recognize that security restrictions can differ across clusters).

In production, Dask workers have outgoing and incoming signals. This means that any machine where a worker is started, signals must be allowed to go in and out on the assigned port. The same goes for the scheduler -- signals go in and out.

This isn't 100% accurate. For clarification, a dask cluster's network diagram looks like:

client ---> scheduler <--- worker1
                 ^            ^
                 |            |
                 |            v
                 +-------- worker2

The scheduler never initiates connections, it only receives them. Workers only ever connect to the scheduler and other workers. Frequently users run both the client and scheduler together on an edge node, which means that network connections in the cluster are only ever job <-> job (which in my experience has been open within certain port ranges) or to the edge node (where the scheduler lives). No connections ever initiate from outside the cluster to a process inside the cluster. I recognize that this might not work with your setup though, just want to make sure you weren't doing extra work that wasn't needed.

jacksund commented 3 years ago

Thanks for the comments! I believe I'm running into issues because of signals like worker1 <-> worker2. I have dask workers spread out across multiple clusters -- i.e. one Dask cluster with Dask workers scattered across separate HPC clusters. If there's a simple way to configure/disable connection types, then I may be able to save a bunch of time here.

I ended up finding it much easier to write an Executor that has a postgres database as the "scheduler" while the client+workers simply read/write tasks to database -- and thus only have outgoing connections.

EDIT: the closest I can get to turning off worker1 <-> worker2 connections is via the total_out_connections and total_in_connections attributes of a Dask Worker or by setting Worker Configurations. I'm messing with this now and will update.

EDIT2: After more testing, it looks like I misdiagnosed my issue... I think I was able to stop worker1 <-> worker2 signals by having the following config (shown below). But this ended up not fixing my problem. The real issue was actually with shared drives on the clusters I built. Because my --local-directory had super slow writes, I interpreted hanging of Dask worker startups (>5 min) as failure to establish connections. Sorry everyone. This was just me being a newbie with Dask clusters.

# example config. this is inside ~/.config/dask/distributed.yaml
distributed:
    worker:
        connections:
            outgoing: 1
            incoming: 0
jacksund commented 3 years ago

@jcrist I believe I resolved my specific use-case. It ended up just being a mistake on my end (see edited comments above)... Sorry about wrapping you up in it. :(

The main discussion on simplifying the executor interface can still be left open. Or you can close this issue and move discussion to an alternative issue. Up to you!

jcrist commented 3 years ago

Oh, glad you figured things out. We usually recommend running workers with --local-directory not on a networked file system for this reason. And with regards to your comments about preventing worker <-> worker communication, there's no way in dask to prevent this - dask expects a fully connectable cluster. Glad that turned out not to be your problem.

I'm going to leave this open, as we still want to simplify this interface in the future.

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

github-actions[bot] commented 1 year ago

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.