apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.33k stars 13.8k forks source link

Airflow API for Updating TaskInstance State to skipped is broken #40575

Closed abhishekbhakat closed 3 days ago

abhishekbhakat commented 5 days ago

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

We have 2 API endpoints to update TaskInstance State:

  1. Set a state of task instances under DAG section.
  2. Updates the state of a task instance under TaskInstance section.

Both these endpoint calls set_state() function in: https://github.com/apache/airflow/blob/a8c4830d9728e9e25d1164e124004eb42cda6e58/airflow/api/common/mark_tasks.py#L85

This has an older code for _iter_subdag_run_ids: https://github.com/apache/airflow/blob/a8c4830d9728e9e25d1164e124004eb42cda6e58/airflow/api/common/mark_tasks.py#L148-L150

This is passing a state that we give to DagRunState(state). Whereas skipped is not a valid state for DagRun.

Error log for 1st endpoint:

[2024-07-03T07:44:56.205+0000] {app.py:1744} ERROR - Exception on /api/v1/dags/parallel_tasks_dag/updateTaskInstancesState [POST]
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 2529, in wsgi_app
    response = self.full_dispatch_request()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1825, in full_dispatch_request
    rv = self.handle_user_exception(e)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1823, in full_dispatch_request
    rv = self.dispatch_request()
         ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/decorator.py", line 68, in wrapper
    response = function(request)
               ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper
    response = function(request)
               ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/validation.py", line 196, in wrapper
    response = function(request)
               ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/validation.py", line 399, in wrapper
    return function(request)
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/response.py", line 112, in wrapper
    response = function(request)
               ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/parameter.py", line 120, in wrapper
    return function(**kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/security.py", line 171, in decorated
    return _requires_access(
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/security.py", line 98, in _requires_access
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/www/decorators.py", line 159, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 79, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/endpoints/task_instance_endpoint.py", line 549, in post_set_task_instances_state
    tis = dag.set_task_instance_state(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 2087, in set_task_instance_state
    altered = set_state(
              ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/api/common/mark_tasks.py", line 148, in set_state
    _iter_subdag_run_ids(dag, session, DagRunState(state), task_ids, commit, confirmed_infos),
                                       ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/enum.py", line 714, in __call__
    return cls.__new__(cls, value)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/enum.py", line 1137, in __new__
    raise ve_exc
ValueError: 'skipped' is not a valid DagRunState

Error log for 2nd endpoint:

2024-07-03 13:39:41 [2024-07-03T08:09:41.242+0000] {app.py:1744} ERROR - Exception on /api/v1/dags/parallel_tasks_dag/dagRuns/manual__2024-07-03T07:54:52.290465+00:00/taskInstances/task_a [PATCH]
2024-07-03 13:39:41 Traceback (most recent call last):
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 2529, in wsgi_app
2024-07-03 13:39:41     response = self.full_dispatch_request()
2024-07-03 13:39:41                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1825, in full_dispatch_request
2024-07-03 13:39:41     rv = self.handle_user_exception(e)
2024-07-03 13:39:41          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1823, in full_dispatch_request
2024-07-03 13:39:41     rv = self.dispatch_request()
2024-07-03 13:39:41          ^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1799, in dispatch_request
2024-07-03 13:39:41     return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/decorator.py", line 68, in wrapper
2024-07-03 13:39:41     response = function(request)
2024-07-03 13:39:41                ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper
2024-07-03 13:39:41     response = function(request)
2024-07-03 13:39:41                ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/validation.py", line 196, in wrapper
2024-07-03 13:39:41     response = function(request)
2024-07-03 13:39:41                ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/validation.py", line 399, in wrapper
2024-07-03 13:39:41     return function(request)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/response.py", line 112, in wrapper
2024-07-03 13:39:41     response = function(request)
2024-07-03 13:39:41                ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/parameter.py", line 120, in wrapper
2024-07-03 13:39:41     return function(**kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/security.py", line 171, in decorated
2024-07-03 13:39:41     return _requires_access(
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/security.py", line 98, in _requires_access
2024-07-03 13:39:41     return func(*args, **kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/www/decorators.py", line 159, in wrapper
2024-07-03 13:39:41     return f(*args, **kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 79, in wrapper
2024-07-03 13:39:41     return func(*args, session=session, **kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/endpoints/task_instance_endpoint.py", line 600, in patch_task_instance
2024-07-03 13:39:41     ti = dag.set_task_instance_state(
2024-07-03 13:39:41          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
2024-07-03 13:39:41     return func(*args, **kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 2087, in set_task_instance_state
2024-07-03 13:39:41     altered = set_state(
2024-07-03 13:39:41               ^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
2024-07-03 13:39:41     return func(*args, **kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/api/common/mark_tasks.py", line 148, in set_state
2024-07-03 13:39:41     _iter_subdag_run_ids(dag, session, DagRunState(state), task_ids, commit, confirmed_infos),
2024-07-03 13:39:41                                        ^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/enum.py", line 714, in __call__
2024-07-03 13:39:41     return cls.__new__(cls, value)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/enum.py", line 1137, in __new__
2024-07-03 13:39:41     raise ve_exc
2024-07-03 13:39:41 ValueError: 'skipped' is not a valid DagRunState

What you think should happen instead?

Shouldn't break and update the task state to skipped as we publish skipped can be a valid input for these APIs in the API reference.

How to reproduce

Call these APIs for a historical DagRun.

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

NA

Deployment

Official Apache Airflow Helm Chart

Deployment details

NA

Anything else?

NA

Are you willing to submit PR?

Code of Conduct

potiuk commented 5 days ago

Thanks. Indeed seems that we are creating DagRunState there rather than TaskInstanceState as an enum and DagRunState does not have skipped state option. Marked it as a good first issue - and feel free to take a stab on it if you want.

abhishekbhakat commented 5 days ago

For now, I have raised a PR to give empty list for subdags if the state is TaskInstanceState.SKIPPED. And verified that APIs are now working for usual scenarios. Not sure about subdags.