apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.88k stars 14.25k forks source link

Otel timing observer overrides it's value #35036

Closed YonatanKiron closed 8 months ago

YonatanKiron commented 1 year ago

Apache Airflow version

2.7.2

What happened

I have two DAGs that constantly pass, one reported to otel agent while the other didn't.

After looking into the logs (in debug mode) it seems like the metric airflow.dagrun.duration.success contains only one value.

            {
              "name": "airflow.dagrun.duration.success.airflow.fail_hanging_dagruns",
              "description": "",
              "unit": "",
              "data": {
                "data_points": [
                  {
                    "attributes": {
                      "dag_id": "airflow.fail_hanging_dagruns",
                      "run_type": "scheduled"
                    },
                    "start_time_unix_nano": 0,
                    "time_unix_nano": 1697655156630862800,
                    "value": 19.202272
                  }
                ]
              }
            },
            {
              "name": "airflow.dagrun.duration.success",
              "description": "",
              "unit": "",
              "data": {
                "data_points": [
                  {
                    "attributes": {
                      "dag_id": "airflow.fail_hanging_dagruns",
                      "run_type": "scheduled"
                    },
                    "start_time_unix_nano": 0,
                    "time_unix_nano": 1697655156630862800,
                    "value": 19.202272
                  }
                ]
              }
            },
            {
              "name": "airflow.dagrun.schedule_delay.system.random.dag.status",
              "description": "",
              "unit": "",
              "data": {
                "data_points": [
                  {
                    "attributes": {},
                    "start_time_unix_nano": 0,
                    "time_unix_nano": 1697655156630862800,
                    "value": 0.647526
                  }
                ]
              }
            },

What you think should happen instead

airflow.dagrun.duration.success value should contain a list of values instead of one

            {
              "name": "airflow.dagrun.duration.success.airflow.fail_hanging_dagruns",
              "description": "",
              "unit": "",
              "data": {
                "data_points": [
                  {
                    "attributes": {
                      "dag_id": "airflow.fail_hanging_dagruns",
                      "run_type": "scheduled"
                    },
                    "start_time_unix_nano": 0,
                    "time_unix_nano": 1697655156630862800,
                    "value": 19.202272
                  }
                ]
              }
            },
            {
              "name": "airflow.dagrun.duration.success",
              "description": "",
              "unit": "",
              "data": {
                "data_points": [
                  {
                    "attributes": {
                      "dag_id": "airflow.fail_hanging_dagruns",
                      "run_type": "scheduled"
                    },
                    "start_time_unix_nano": 0,
                    "time_unix_nano": 1697655156630862800,
                    "value": 19.202272
                  },
                  {
                    "attributes": {
                      "dag_id": "system.random.dag.status",
                      "run_type": "scheduled"
                    },
                    "start_time_unix_nano": 0,
                    "time_unix_nano": 1697655156630862800,
                    "value": 0.647526
                  }
                ]
              }
            },
            {
              "name": "airflow.dagrun.schedule_delay.system.random.dag.status",
              "description": "",
              "unit": "",
              "data": {
                "data_points": [
                  {
                    "attributes": {},
                    "start_time_unix_nano": 0,
                    "time_unix_nano": 1697655156630862800,
                    "value": 0.647526
                  }
                ]
              }
            },

How to reproduce

Create 2 dummy DAGs that pass and watch the metrics

Operating System

Docker bitnami/airflow:2.7.2

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==7.4.1
apache-airflow-providers-apache-cassandra==3.2.1
apache-airflow-providers-apache-drill==2.4.4
apache-airflow-providers-apache-druid==3.5.0
apache-airflow-providers-apache-hdfs==4.1.1
apache-airflow-providers-apache-hive==6.1.6
apache-airflow-providers-apache-impala==1.1.3
apache-airflow-providers-apache-pinot==4.1.4
apache-airflow-providers-arangodb==2.2.2
apache-airflow-providers-celery==3.3.4
apache-airflow-providers-cloudant==3.2.1
apache-airflow-providers-cncf-kubernetes==7.6.0
apache-airflow-providers-common-sql==1.7.2
apache-airflow-providers-daskexecutor==1.0.1
apache-airflow-providers-databricks==4.5.0
apache-airflow-providers-docker==3.7.5
apache-airflow-providers-elasticsearch==5.0.2
apache-airflow-providers-exasol==4.2.5
apache-airflow-providers-ftp==3.5.2
apache-airflow-providers-google==10.9.0
apache-airflow-providers-grpc==3.2.2
apache-airflow-providers-hashicorp==3.4.3
apache-airflow-providers-http==4.5.2
apache-airflow-providers-imap==3.3.2
apache-airflow-providers-influxdb==2.2.3
apache-airflow-providers-microsoft-azure==7.0.0
apache-airflow-providers-microsoft-mssql==3.4.2
apache-airflow-providers-mongo==3.2.2
apache-airflow-providers-mysql==5.3.1
apache-airflow-providers-neo4j==3.3.3
apache-airflow-providers-postgres==5.6.1
apache-airflow-providers-presto==5.1.4
apache-airflow-providers-redis==3.3.2
apache-airflow-providers-sendgrid==3.2.2
apache-airflow-providers-sftp==4.6.1
apache-airflow-providers-slack==8.1.0
apache-airflow-providers-sqlite==3.4.3
apache-airflow-providers-ssh==3.7.3
apache-airflow-providers-trino==5.3.1
apache-airflow-providers-vertica==3.5.2

Deployment

Other 3rd-party Helm chart

Deployment details

bitnami/airflow

Anything else

I suspect that this code block is overriding the metrics key every time instead of appending it's values in case of existence

https://github.com/apache/airflow/blob/2.7.2/airflow/metrics/otel_logger.py#L327-L343

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

YonatanKiron commented 1 year ago

Another way to reproduce:

from airflow.stats import Stats
Stats.timing(f"dagrun.duration.success", **{"dt":1000, "tags":{"dag_id": "bla"}})
Stats.timing(f"dagrun.duration.success", **{"dt":1000, "tags":{"dag_id": "bla2"}})
Stats.instance.metrics_map.map
#### {'airflow.dagrun.duration.success_dag_id_bla': Observation(value=1000.0, attributes={'dag_id': 'bla'}),
####  'airflow.dagrun.duration.success_dag_id_bla2': Observation(value=1000.0, attributes={'dag_id': 'bla2'})}

The debug log will report:

{
    "resource_metrics": [
        {
            "resource": {
                "attributes": {
                    "service.name": "Airflow"
                },
                "schema_url": ""
            },
            "scope_metrics": [
                {
                    "scope": {
                        "name": "airflow.metrics.otel_logger",
                        "version": null,
                        "schema_url": ""
                    },
                    "metrics": [
                        {
                            "name": "airflow.dagrun.duration.success",
                            "description": "",
                            "unit": "",
                            "data": {
                                "data_points": [
                                    {
                                        "attributes": {
                                            "dag_id": "bla"
                                        },
                                        "start_time_unix_nano": 0,
                                        "time_unix_nano": 1697698348843414114,
                                        "value": 1000.0
                                    }
                                ]
                            }
                        }
                    ],
                    "schema_url": ""
                }
            ],
            "schema_url": ""
        }
    ]
}

So my assumption, is that the issue is value override was wrong and the issue is somewhere on the reporting part

YonatanKiron commented 1 year ago

Seems like an issue in open telemetry https://github.com/open-telemetry/opentelemetry-python/issues/2558

https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py#L75-L95

:(

ferruzzi commented 8 months ago

Hey, since this is an Otel issue and not an Airflow issue I want to close the ticket. Feel free to reopen if you disagree.