PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.38k stars 1.59k forks source link

Add more ways of handling deployment concurrency #15340

Closed jeanluciano closed 1 month ago

jeanluciano commented 1 month ago

Describe the current behavior

We can currently pass in an int to concurrency_limit, once the limit is reached, additional runs are enqueued and will run when a slot becomes available

Describe the proposed behavior

When creating a deployment, we should be able to pass an int or an object for the concurrency_limit. The object allows an extra configuration option for configuring the behavior for new runs once the limit has been reached. This object will have a collision_strategy field that will hold the different types of strategies. It should support common strategies like enqueue which is what is currently implemented and, cancel-new which would cancel the new flowrun asking for a concurrency slot. As-is the current behavior today, if a bare int is passed in then the default collision strategy to use will be enqueue.

In order to support this extra collision strategy, we need to persist this configuration somewhere. Thus proposing a new column on the deployment table (deployment.concurrency_options) which will store JSONB data that for now will have a schema like:

class ConcurrencyOptions:
    collision_strategy: Literal['enqueue', 'cancel-new'] = 'enqueue'

Choosing to use a JSONB despite only having 1 field in order to support potential enhancements in the future for options like timeouts, queue priority, and others.

In this way, the existing concurrency_limit int field can stay where it is. This means that callsites will support a Union[int, ConcurrencyLimitConfig] type where

class ConcurrencyLimitConfig:
    concurrency: int  # equivalent to passing a bare int in
    collision_strategy: Literal['enqueue', 'cancel-new'] = 'enqueue'

but on the data model, we'll persist the int limit separately from the collision strategy which will live in a ConcurrencyOptions object.

Other alternatives that were considered:

Example Use

if __name__ == "__main__":
    hello_world.deploy(
        name="LatestFlow",
        work_pool_name="my_pool",
        parameters=dict(name="Prefect"),
        image="my_registry/my_image:my_image_tag",
        concurrency_limit={concurrency=1, collision_strategy="cancel-new"},
    )

Additional context

Related: #14934

abrookins commented 1 month ago

Adding some context: I prefer a JSON field because collision strategies may have parameters, such as:

Queue Priority: Set priority levels for queued executions when using the Enqueue option Timeout: Set a maximum wait time for queued executions

@cicdw Separately, we've been running with "collision strategy" as the name of this field. Collision, in my mind, recalls hash collisions, while conflict is more closely related to concurrency. Or another way to put it: conflicts arise naturally and we manage them with a strategy, but we should prevent collisions. So if we buy any of that, conflict_strategy might be a better name for the field.

cicdw commented 1 month ago

+1 to conflict over collision, but even conflict sounds more antagonistic than I think is needed; this is basically a strategy for what happens when something is actually "limited", so what about limiting_strategy / limit_strategy as the name?

Also +1 to making the concurrency configuration an object like this, we really want to get away from having such a flat configuration for everything.

jeanluciano commented 1 month ago

I really like the idea of only having one schema instead of having a one for the ORM thats identical to the schema minus concurrency:int. That would requires us to do a data migration of concurrency_limit but I believe would be best long term. Any thoughts?

collincchoy commented 1 month ago

Closed by #15291 #15425 #15468