PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.16k stars 1.58k forks source link

Round robin scheduling of work queues within the same work pool #9431

Open usefulalgorithm opened 1 year ago

usefulalgorithm commented 1 year ago

First check

Prefect Version

2.x

Describe the current behavior

If an agent is connected to multiple work queues, in get_and_submit_flow_runs it creates the list of flow runs to do, submittable_runs , by collecting all flow runs in the work queues and sorting them according to their respective next_scheduled_start_time.

Describe the proposed behavior

If the flag round_robin is switched on when creating the agent, when collecting the flow runs in the work queues the agent would ensure flow runs for each work queue are submitted in a interleaved, collated manner.

E.g. Suppose we have those work queues all connected to an agent

work queue 1: [(scheduled=0), (scheduled=1), (scheduled=2), (scheduled=5)]
work queue 2: [(scheduled=3), (scheduled=7)]
work queue 3: [(scheduled=4), (scheduled=6)]

scheduled represents the order / time when the flow runs are scheduled.

In the original behavior the agent would collect the flow runs into

submittable_runs = [0,1,2,3,4,5,6,7]

In my proposed scheduling mechanism, it would be

submittable_runs = [0,3,4,1,7,6,2,5]

Example Use

This mechanism can ensure different users all get to execute their flows, even if some other user submitted a bunch of tasks before another. This is especially useful if we want to separate the workloads from different users, the workloads are quite computationally intensive (i.e. too many flows run at the same time could cause the server to crash), and we want each user to be able to get their flow's result without having to wait for all prior flows to complete.

Additional context

No response

zanieb commented 1 year ago

Are you using concurrency limits on the agent? Otherwise I'd expect all of the runs to be submitted very quickly regardless of ordering.

billpalombi commented 1 year ago

Thanks for submitting this @usefulalgorithm!

Thank you for laying out your use case. It makes sense. We introduced work pools in part to facilitate these types of arrangements. We'll take up the use case, but we'll likely pursue a server side implementation via a setting on the work pool, especially since this would involve ignoring the priority of each queue.

usefulalgorithm commented 1 year ago

Hi, thanks a bunch for the swift responses!

@madkinsz Yes, in our design we are limiting the concurrency of each agent. To add a bit more context, our configuration runs tasks as k8s jobs, and it's likely for the jobs to take up all resources on the server, rendering other pods in the k8s cluster unresponsive.

One idea is that instead of setting limits on agents, we can probably just set the request quota in our job template, and not limit the agents. I'm guessing when Prefect tries to start a pod for a job, the pod will stay in Pending (or some other state that's not Running) until there's enough resource for it, so in that sense we get to queue our jobs without setting agent concurrency limits. Not sure if Prefect has some timeout limit for pods having to be started after the flow is scheduled.

@billpalombi Thanks! Is it possible to know / estimate when this feature would be in Prefect?

zanieb commented 1 year ago

@usefulalgorithm

Not sure if Prefect has some timeout limit for pods having to be started after the flow is scheduled.

We do but it's configurable :) Setting request quotas seems like a more correct solution.