PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.94k stars 1.56k forks source link

Prevent holding a task worker limit while waiting for task inputs to resolve #14092

Open desertaxle opened 3 months ago

desertaxle commented 3 months ago

We've seen scenarios with task workers where a task worker will hang because all the capacity limiter slots are taken up with tasks that are waiting for upstream dependencies. The upstream dependencies can't run because there are no slots for execution. We should avoid holding a capacity limiter slot when a task waits for upstream dependencies to resolve these deadlocks.

To accomplish this, we can move dependency resolution to the task worker. The flow would look something like this:

  1. A task run comes in. Capacity limiter slot is taken.
  2. Parameters and wait_for for task run are read and inspected for PrefectDistributeFutures
  3. If a PrefectDistributedFuture exists in the task run parameters, release the capacity limiter slot and wait for the futures to resolve. Once they resolve, update the parameters with the resolved value, set wait_for to an empty list, and take a capacity limiter slot again.
  4. Submit the updated parameters and wait_for to the task engine for execution. The engine will not wait since everything is already resolved.

Note the current work around for this issue is to increase the limit on the task worker.