apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

Metrics - `airflow_ti.start.*` and `airflow_ti.finish.*` #32159

Open ferruzzi opened 1 year ago

ferruzzi commented 1 year ago

Apache Airflow version

2.6.2

What happened

There is an issue with the airflow_ti.start.<dag_id>.<task_id>.<state> and airflow_ti.finish.<dag_id>.<task_id>.<state> metrics when running Airflow with OpenTelemetry. Both of those get emitted when run with StatsD but are flaky under OTel.

I am submitting this as an Issue since I will be a little distracted for the next bit and figured someone may be able to have a look in the meantime. Please do not assign it to me, I'll get it when I can is nobody else does.

What you think should happen instead

Behavior should be consistent.

How to reproduce

To reproduce, you can run Breeze with the statsd or the otel integration (for example breeze start-airflow --integration otel) and run one or more of the following DAGs, then open the OTel or StatsD raw data view to verify.

These two DAGs don't generate any airflow_ti_finish.* metrics:

from airflow import DAG
from airflow.decorators import task
from airflow.utils.timezone import datetime

@task
def task1():
    return 'Hello'

@task
def task2():
    return 'World!'

@task
def task3(in1, in2):
    print(f'{in1} {in2}')

with DAG(
    dag_id='taskflow_demo',
    start_date=datetime(2021, 1, 1),
    schedule=None,
    catchup=False
) as dag:

    task3(task1(), task2())
import time

from airflow import DAG
from airflow.decorators import task
from airflow.utils.timezone import datetime

@task
def task1():
    time.sleep(10)

with DAG(
    dag_id='sleep_10',
    start_date=datetime(2021, 1, 1),
    schedule=None,
    catchup=False
) as dag:

    task1()

but this one:

import time
from datetime import timedelta

from airflow import DAG
from airflow.decorators import task
from airflow.utils.timezone import datetime

def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(
        "The callback arguments are: ",
        {
            "dag": dag,
            "task_list": task_list,
            "blocking_task_list": blocking_task_list,
            "slas": slas,
            "blocking_tis": blocking_tis,
        },
    )

@task(sla=timedelta(seconds=10))
def sleep_20():
    """Sleep for 20 seconds"""
    time.sleep(20)

@task
def sleep_30():
    """Sleep for 30 seconds"""
    time.sleep(30)

with DAG(
    dag_id='fail_S_L_A',
    start_date=datetime(2021, 1, 1),
    schedule="*/2 * * * *",
    catchup=False,
    sla_miss_callback=sla_callback,
) as dag:

    sleep_20() >> sleep_30()

triggers all of the following....

# HELP airflow_ti_finish_fail_s_l_a_sleep_30_deferred 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_deferred counter
airflow_ti_finish_fail_s_l_a_sleep_30_deferred{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_failed 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_failed counter
airflow_ti_finish_fail_s_l_a_sleep_30_failed{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_none 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_none counter
airflow_ti_finish_fail_s_l_a_sleep_30_none{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_queued 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_queued counter
airflow_ti_finish_fail_s_l_a_sleep_30_queued{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_removed 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_removed counter
airflow_ti_finish_fail_s_l_a_sleep_30_removed{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_restarting 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_restarting counter
airflow_ti_finish_fail_s_l_a_sleep_30_restarting{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_running 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_running counter
airflow_ti_finish_fail_s_l_a_sleep_30_running{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_scheduled 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_scheduled counter
airflow_ti_finish_fail_s_l_a_sleep_30_scheduled{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_shutdown 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_shutdown counter
airflow_ti_finish_fail_s_l_a_sleep_30_shutdown{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_skipped 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_skipped counter
airflow_ti_finish_fail_s_l_a_sleep_30_skipped{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_success 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_success counter
airflow_ti_finish_fail_s_l_a_sleep_30_success{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_up_for_reschedule 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_up_for_reschedule counter
airflow_ti_finish_fail_s_l_a_sleep_30_up_for_reschedule{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_up_for_retry 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_up_for_retry counter
airflow_ti_finish_fail_s_l_a_sleep_30_up_for_retry{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0
# HELP airflow_ti_finish_fail_s_l_a_sleep_30_upstream_failed 
# TYPE airflow_ti_finish_fail_s_l_a_sleep_30_upstream_failed counter
airflow_ti_finish_fail_s_l_a_sleep_30_upstream_failed{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"} 0

Of note: it hit every stage and reported on it, and of note, it's only reporting for that one particular method (sleep_30) so perhaps that's my misunderstanding of when/why it gets triggered.

Things I have tried and (possibly?) ruled out

Operating System

ubuntu

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 4 months ago

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

potiuk commented 4 months ago

not stale