PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.19k stars 1.58k forks source link

Fix deployment concurrency limiting GCLs no longer created/updated #15338

Closed collincchoy closed 1 month ago

collincchoy commented 1 month ago

Describe the current behavior

In order to enforce concurrency limits for a deployment (deployment.concurrency_limit), a worker uses prefect's concurrency helper to implicitly create and use a global concurrency limit under-the-hood.

This has some issues namely:

Additionally there's a bug atm where deployment.concurrency_limit is being used as the # of slots to acquire rather than the # of slots to be made available. This means that deployment.concurrency_limits with a value > 1 don't work correctly. This is a separate issue but is entangled with some of the logic that's going to be proposed here so my proposal will include fixing this bug.

Describe the proposed behavior

Due to #15297 , we need to explicitly manage the backing global concurrency limit objects for deployments that have defined concurrency_limits. In particular, whenever a deployment is created and it has a concurrency_limit set, then a concurrency_limit_v2 object should be created as well. Additionally whenever a deployment's concurrency_limit is updated, the underlying GCL's # of slots should be updated as well.

Option A:

Deployments should manage their own related GCL. Deployment APIs/model-methods should implicitly handle creating/updating/deleting corresponding GCL's.

  1. On POST /deployments, if the request includes a concurrency_limit, then create a corresponding concurrency_limit_v2 object with a predictable, unique name and pending #15316 , is associated with the deployment
    • Because this route is functionally an upsert, need to handle updating the concurrency_limit appropriately here as well
  2. On PATCH /deployments/{id}, if the request includes a concurrency_limit, then update the corresponding concurrency_limit_v2 object's # slots with the updated limit value
  3. On DELETE /deployments/{id}, if the deployment to delete includes a concurrency_limit, then delete any corresponding concurrency_limit_v2 objects.

Option B:

Upsert the GCL at runtime to lazily provision and sync GCLs for deployments in workers/runners.

Add a new route to support upserting global concurrency limit objects (aka concurrency_limit_v2 objects).

@router.put("/v2/concurrency_limits")
async def upsert_concurrency_limit_v2(
    concurrency_limit: actions.ConcurrencyLimitV2Create
):
    ...

Then for workers, when submitting runs, instead of relying on the prior create_if_missing behavior within the concurrency helper, the worker should first explicitly upsert the GCL to guarantee that it exists and also sync it's # of slots with the deployment's current concurrency limit. Then, go ahead and acquire a slot as happens today.

Alternatively, the worker could use the new strict mode within the concurrency helper to attempt to acquire a slot and on failure to (b/c no limit exists yet), explicitly create the limit and then retry. This wouldn't require any new endpoints but it would mean that keeping the GCL's # of slots in sync with the deployment's concurrency limit would have to happen elsewhere.

Option C:

Another alternative is that we add a more use-case-specific route that would implicitly handle marking the created GCL as being used for deployment concurrency limiting so that it's treated differently from user-defined GCL's. The nuance difference here is that the proposed upsert route would be a generalized GCL route and the client would have to include metadata within the request to mark the GCL as being for internal usage so that it's not exposed to users in the same way as user-defined limits.

Option D:

Add deployment concurrency limit management to client-side deployment creation/updates. Similar to Option A but client-side instead of server-side with the benefit of not needing any server-side changes.

Example Use

No response

Additional context

No response

abrookins commented 1 month ago

Ideally, we want to be able to trust the operation that updates the concurrency limit on the deployment. If we can trust that, then this defensive posture around syncing is less necessary. And we can probably trust it, right? We won't expose the internal limit that manages the deployment's limit, and there is only one way to change it (when the user changes the deployment's concurrency limit), which should happen in a database transaction (the deployment limit and limit record change together). So we probably don't need the worker to try and ensure the limit number on the limit record matches the limit number on the deployment record.

If we buy the idea that a database transaction is reliable enough to avoid having to make the worker think about syncing anything, that leaves us with a proposal to add an upsert route to avoid having to catch exceptions. Let's explore that.

As you say, we can already use the new strict mode and catch the exception. I don't think an upsert route would actually get around managing the exception if concurrency(..., strict=True) fails. We would still want to use concurrency(..., strict=True) because we need to be sure the limit exists at the time we try to take it, not just immediately before. And if we're doing that, we need to catch the error. If we're catching the error, we should attempt to create the limit and try again anyway. So maybe we don't need to upsert first.

collincchoy commented 1 month ago

I think there's a design decision to be made here about when the global concurrency limit should be created/updated.

It could be managed as a property of the deployment itself so when a deployment is created with a defined concurrency limit, then create the backing GCL then and there and subsequently whenever the deployment's concurrency_limit is updated, update the backing GCL.

or

It could be lazily provisioned at runtime.

My proposal leaned toward the latter along with adding an upsert which would handle both creating and updating/keeping-in-sync the GCL without requiring additional changes to deployment web APIs to handle the underlying GCL in both create/update routes.

I like this proposal as a minimalistic approach, but I think I hear your concern that the upsert wouldn't really guarantee that the limit is available at acquisition time since the upsert and the acquisition are no longer occurring within the same transaction. Thus if we need to handle the case where concurrency(..., strict=True) fails because the limit DNE anyways then what's the point of the upsert.

I think that's a valid point however shifting to handle the exception by explicitly creating and then retrying doesn't solve that issue either because there's still a gap between the creation and the acquisition.

Would we be comfortable just setting a max_retries on the attempt to acquire slots so that on failure, we'd retry some limited number of times - this seems like a good idea in either case and would help with the concern, but I'm not sure what else we'd be able to do other than add another route to explicitly handle this use-case similar to the /increase route's behavior prior to #15297

abrookins commented 1 month ago

I'm in favor of provisioning when the user changes the deployment concurrency limit. This would significantly cut the required API traffic around managing limits. Lazy provisioning would entail an upsert every time a deployment with a concurrency limit ran a flow, right?

collincchoy commented 1 month ago

Lazy provisioning would entail an upsert every time a deployment with a concurrency limit ran a flow, right?

Ah yeah. seems not great 👍 Okay that along with being able to manage updates across tables (the deployment and the gcl record) within a single transaction feels compelling enough to shift the proposal here over to creating the underlying GCL during deployment creation and updating it along with deployment updates rather than this lazy provisioning strategy.

I will go edit the proposal outlined in this issue.