apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.26k stars 14.33k forks source link

AIP 72: Handling "deferrable" tasks in execution_api and task SDK #44241

Open amoghrajesh opened 1 day ago

amoghrajesh commented 1 day ago

closes: #44137

This PR is trying to port the "deferral" logic from airflow 2 to the airflow 3 (execution api + task sdk)

Summary of changes:

Server side changes (execution api):

  1. Handling TIDeferredStatePayload in ti_update_state -> covered by unit test: test_ti_update_state_to_deferred a. Didn't piggy back on ti.defer_task() as it extracts the trigger out of TaskDeferred exception. It is much more expensive to send across multiple models like TaskInstance, TaskDeferred, Trigger instead of just the required minimal properties b. returning and not proceeding with query execution as we already do it above https://github.com/apache/airflow/pull/44241/files#diff-d44a72566870079ee943e24bac2af74fb84c426c54d210561a251549a7078ed7L129
  2. Defining a datamodel for TIDeferredStatePayload and adding it to the discriminator: ti_state_discriminator

Client side changes (task sdk):

HTTP client:

Added a new function defer that sends a patch request to the task-instances/{id}/state execution api with payload: PatchTIToDeferred

Comms:

Defining a new data model to send a request to patch ti as "deferred" from task runner to supervisor: PatchTIToDeferred (Added to ToSupervisor)

Supervisor:

  1. Adding a new class property as _final_state to support @property final_state which is final state of a TI a. Added a setter to set values for this final state for cases like deferred so that the finish is not called for tasks those aren't in terminal stage: https://github.com/apache/airflow/pull/44241/files#diff-c2651fdee1a25e091e2a9d4f937f8032ca3d289d0de76f38ed88aee5df0f880dL392-L394
  2. Deferred tasks first enter the "wait" in supervisor and then mark themselves as deferred by setting up a trigger. So skipping finish() when final_state is not TerminalTIState
  3. Extended handle_requests to receive requests from task runner and forwarding the message to http client to call defer

TaskRunner:

Task runner executes: ti.task.execute and raises TaskDeferred for deferral. This sends a request to supervisor using SUPERVISOR_COMMS

How was this tested?

  1. The end to end flow isn't available as of now, but I have tried to cover up the ground using unit tests
  2. New case under test_handle_requests covers the supervisor + client side of things along with a mock of the message from task runner
  3. test_ti_update_state_to_deferred covers the scenario for execution API

^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.