PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.32k stars 1.64k forks source link

Deployment Concurrency Limits #14934

Closed jeanluciano closed 1 month ago

jeanluciano commented 3 months ago

Describe the current behavior

Today, Prefect users can set concurrency for work pools and/or tags. Work pool concurrency is sufficient for throttling deployment execution in infrastructure with homogenous work. For this reason, users hoping to create deployment concurrency often create one-workpool-per-deployment, which is cumbersome, unergonomic, and a resiliency burden.

Describe the proposed behavior

Key components:

  1. Concurrency Limit: Allow users to set the maximum number of concurrent executions for a workflow

  2. Collision Handling: Provide options for managing executions when the concurrency limit is reached:

    • Enqueue: Add new executions to a queue and run them when a slot becomes available

    • Cancel: Cancel new execution attempts that exceed the concurrency limit

    • Cancel Running: Cancel the currently running execution(s) and start the new one

When a worker sees a scheduled run, it should check the deployment's concurrency limit and use the specified behavior:

If using concurrency_limit:

  1. Use a global rate limit inside of workers' run method, such that workers only spin up infrastructure and run a flow if they succeed in acquiring a slot for the limit

  2. Attempt to acquire a global concurrency limit, passing concurrency(…, timeout=0) to time out the concurrency check immediately if a slot isn't available

  3. If we acquire the lock, run the flow. If we can't acquire a lock, propose a new state, Scheduled("AwaitingConcurrencySlot"), handle with the corresponding collision strategy i.e re-queue the flow, then continue to the next flow run.

Otherwise, run the flow.

Additional configuration options:

Queue Priority: Set priority levels for queued executions when using the Enqueue option

Timeout: Set a maximum wait time for queued executions

Dynamic Concurrency: Allow concurrency limits to be adjusted based on system load or time of day

Example Use

Allows users to have fine-grained control over flow concurrency limits:

from prefect import deploy, flow
@flow()
def expensive_compute():
    ...

@flow()
def cheap_compute():
...

if __name__ == "__main__":
    deploy(
       expensive_compute.to_deployment(name="buy-deploy",concurrency_limit=3),
        cheap_compute.to_deployment(name="sell-deploy"),
        work_pool_name="my-dev-work-pool"
        image="my-registry/my-image:dev",
        push=False,
    )

Additionally, users would also be able to pass in one of ["enqueue", "cancel", "cancel-running"] to concurrency_limit:

if __name__ == "__main__":
    hello_world.deploy(
        name="LatestFlow",
        work_pool_name="my_pool",
        parameters=dict(name="Prefect"),
        image="my_registry/my_image:my_image_tag",
        concurrency={limit=1, collision_strategy="cancel-running"},
    )
Ishankoradia commented 2 months ago

Hey @cicdw any timeline on this ? Is this something being developed internally ?

cicdw commented 2 months ago

This is being developed in the open - you can see the first PR linked above; this should be complete by the end of the month at the latest 👍

frantakalab commented 1 month ago

@jeanluciano any eta on this? I see there's a lot of work done <3, but don't want to guess when it could make it to a release.

cicdw commented 1 month ago

Hi @frantakalab - this was just released in 3.0.4! Looks like we need to add some more documentation to make this easily discoverable, I'll open a separate issue for that.