apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.41k stars 14.11k forks source link

Mismatch of taskinstances executed for dagrun result in API vs web UI #29304

Closed Bowrna closed 1 year ago

Bowrna commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

airflow version: 2.4.2

Screenshot 2023-02-02 at 1 24 09 PM

As seen in the above screenshot, I have highlighted a specific dag run and in that case I have not added the task_id snowflake_query and it has no status in the web UI

For that same dag run I tried to retrieve the list of task instances for specific dagrun via API. The API response is below:

{
  "task_instances": [
    {
      "dag_id": "example_great_expectations_dag",
      "dag_run_id": "manual__2023-01-25T09:14:40.128465+00:00",
      "duration": 3.187616,
      "end_date": "2023-01-25T09:14:50.761011+00:00",
      "execution_date": "2023-01-25T09:14:40.128465+00:00",
      "executor_config": "{}",
      "hostname": "test.com",
      "map_index": -1,
      "max_tries": 0,
      "operator": "SnowflakeOperator",
      "pid": 16793,
      "pool": "default_pool",
      "pool_slots": 1,
      "priority_weight": 2,
      "queue": "default",
      "queued_when": "2023-01-25T09:14:41.722900+00:00",
      "rendered_fields": {
        "sql": "\n            select * from CUSTOMER limit 3;\n        "
      },
      "sla_miss": null,
      "start_date": "2023-01-25T09:14:47.573395+00:00",
      "state": "success",
      "task_id": "query_snowflake",
      "try_number": 1,
      "unixname": "root"
    },
    {
      "dag_id": "example_great_expectations_dag",
      "dag_run_id": "manual__2023-01-25T09:14:40.128465+00:00",
      "duration": 5.457847,
      "end_date": "2023-01-25T09:15:37.789632+00:00",
      "execution_date": "2023-01-25T09:14:40.128465+00:00",
      "executor_config": "{}",
      "hostname": "test.com",
      "map_index": -1,
      "max_tries": 0,
      "operator": "_PythonDecoratedOperator",
      "pid": 17444,
      "pool": "default_pool",
      "pool_slots": 1,
      "priority_weight": 1,
      "queue": "default",
      "queued_when": "2023-01-25T09:15:26.686085+00:00",
      "rendered_fields": {
        "op_args": "([{'C_CUSTKEY': 60001, 'C_NAME': 'Customer#000060001', 'C_ADDRESS': '9Ii4zQn9cX', 'C_NATIONKEY': 14, 'C_PHONE': '24-678-784-9652', 'C_ACCTBAL': Decimal('9957.56'), 'C_MKTSEGMENT': 'HOUSEHOLD', 'C_COMMENT': 'l theodolites boost slyly at the platelets: permanently ironic packages wake slyly pend'}, {'C_CUSTKEY': 60002, 'C_NAME': 'Customer#000060002', 'C_ADDRESS': 'ThGBMjDwKzkoOxhz', 'C_NATIONKEY': 15, 'C_PHONE': '25-782-500-8435', 'C_ACCTBAL': Decimal('742.46'), 'C_MKTSEGMENT': 'BUILDING', 'C_COMMENT': ' beans. fluffily regular packages'}, {'C_CUSTKEY': 60003, 'C_NAME': 'Customer#000060003', 'C_ADDRESS': 'Ed hbPtTXMTAsgGhCr4HuTzK,Md2', 'C_NATIONKEY': 16, 'C_PHONE': '26-859-847-7640', 'C_ACCTBAL': Decimal('2526.92'), 'C_MKTSEGMENT': 'BUILDING', 'C_COMMENT': 'fully pending deposits sleep quickly. blithely unusual accounts across the blithely bold requests are quickly'}],)",
        "op_kwargs": {},
        "templates_dict": null
      },
      "sla_miss": null,
      "start_date": "2023-01-25T09:15:32.331785+00:00",
      "state": "failed",
      "task_id": "convert_to_df",
      "try_number": 1,
      "unixname": "root"
    },
    {
      "dag_id": "example_great_expectations_dag",
      "dag_run_id": "manual__2023-01-25T09:14:40.128465+00:00",
      "duration": 26.942548,
      "end_date": "2023-01-25T09:15:25.242690+00:00",
      "execution_date": "2023-01-25T09:14:40.128465+00:00",
      "executor_config": "{}",
      "hostname": "test.com",
      "map_index": -1,
      "max_tries": 0,
      "operator": "GreatExpectationsOperator",
      "pid": 16885,
      "pool": "default_pool",
      "pool_slots": 1,
      "priority_weight": 1,
      "queue": "default",
      "queued_when": "2023-01-25T09:14:41.722900+00:00",
      "rendered_fields": {},
      "sla_miss": null,
      "start_date": "2023-01-25T09:14:58.300142+00:00",
      "state": "failed",
      "task_id": "snowflake_validate",
      "try_number": 1,
      "unixname": "root"
    }
  ],
  "total_entries": 3
}

The response contains 3 entries while the UI shows only the 2 task instance executed. The task_id query_snowflake is not added to DAG at this point of time.

What you think should happen instead

It should return only the 2 task instances that got executed in the dag run

How to reproduce

By invoking the Airflow API "List task instances"

curl -X 'GET' \
  'http://localhost:8080/api/v1/dags/example_great_expectations_dag/dagRuns/manual__2023-01-25T09%3A14%3A40.128465%2B00%3A00/taskInstances?limit=100' \
  -H 'accept: application/json'

Operating System

Linux

Versions of Apache Airflow Providers

apache-airflow==2.4.2 apache-airflow-providers-celery==3.1.0 apache-airflow-providers-common-sql==1.3.3 apache-airflow-providers-ftp==3.2.0 apache-airflow-providers-http==4.1.0 apache-airflow-providers-imap==3.1.0 apache-airflow-providers-snowflake==4.0.2 apache-airflow-providers-sqlite==3.3.0

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

potiuk commented 1 year ago

I believe this is because previously you had "query_snowflake" task in your DAG that has been renamed to "snowflake_query" (notice that the UI si "snowflake_query" and in the API response you have "query_snowflake".

Airflow does not currently support renaming of tasks and generally versioning (this might get implemented with https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning) at some point of time and the effect of renaming tasks is exactly what you see:

You can verify that by adding a new tasl "query_snowflake" to your DAG or renaming it back - then the tasks executed in the past for "query_snowflake" will start showin up.

The current recomendation when you remove tasks from DAG or change the structure significantly is to also change DAG_ID (effectively creating a new DAG) because we have currently not even a good way of showing history for such DAG (until AIP-36 gets implemented).

Bowrna commented 1 year ago

@potiuk thanks for clarifying this part to me. yes it makes sense to me now. i have wrongly assumed that the api response is incorrect.