apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.51k stars 14.14k forks source link

Invalid stat name using opentelemetry #34845

Closed alvaroserper closed 11 months ago

alvaroserper commented 11 months ago

Apache Airflow version

2.7.1

What happened

When running a dag an error ocurred. The error says that there is a metric with an invalid name. This causes that the task of the dag is set up for retry. Then the task executes again and is marked as success.

2023-10-10 13:05:21 [2023-10-10T11:05:21.738+0000] {local_executor.py:135} ERROR - Failed to execute task Invalid stat name: ***.dag.cwf_path_inspector_generator.delete-xcom-task.queued_duration.  Please see https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax.
2023-10-10 13:05:21 Traceback (most recent call last):
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/executors/local_executor.py", line 131, in _execute_work_in_fork
2023-10-10 13:05:21     args.func(args)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/cli/cli_config.py", line 49, in command
2023-10-10 13:05:21     return func(*args, **kwargs)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/utils/cli.py", line 113, in wrapper
2023-10-10 13:05:21     return f(*args, **kwargs)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/cli/commands/task_command.py", line 430, in task_run
2023-10-10 13:05:21     task_return_code = _run_task_by_selected_method(args, _dag, ti)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/cli/commands/task_command.py", line 208, in _run_task_by_selected_method
2023-10-10 13:05:21     return _run_task_by_local_task_job(args, ti)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/cli/commands/task_command.py", line 270, in _run_task_by_local_task_job
2023-10-10 13:05:21     ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/utils/session.py", line 77, in wrapper
2023-10-10 13:05:21     return func(*args, session=session, **kwargs)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/jobs/job.py", line 289, in run_job
2023-10-10 13:05:21     return execute_job(job, execute_callable=execute_callable)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/jobs/job.py", line 318, in execute_job
2023-10-10 13:05:21     ret = execute_callable()
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/jobs/local_task_job_runner.py", line 143, in _execute
2023-10-10 13:05:21     if not self.task_instance.check_and_change_state_before_execution(
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/utils/session.py", line 77, in wrapper
2023-10-10 13:05:21     return func(*args, session=session, **kwargs)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/models/taskinstance.py", line 1366, in check_and_change_state_before_execution
2023-10-10 13:05:21     self.emit_state_change_metric(TaskInstanceState.RUNNING)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/models/taskinstance.py", line 1450, in emit_state_change_metric
2023-10-10 13:05:21     Stats.timing(f"dag.{self.dag_id}.{self.task_id}.{metric_name}", timing)
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/metrics/otel_logger.py", line 266, in timing
2023-10-10 13:05:21     if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/metrics/otel_logger.py", line 95, in name_is_otel_safe
2023-10-10 13:05:21     return bool(stat_name_otel_handler(prefix, name, max_length=OTEL_NAME_MAX_LENGTH))
2023-10-10 13:05:21   File "/usr/local/lib/python3.10/dist-packages/airflow/metrics/validators.py", line 142, in stat_name_otel_handler
2023-10-10 13:05:21     raise InvalidStatsNameException(
2023-10-10 13:05:21 airflow.exceptions.InvalidStatsNameException: Invalid stat name: ***.dag.cwf_path_inspector_generator.delete-xcom-task.queued_duration.  Please see https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax

What you think should happen instead

There should not be an error with the name of a default metric causing a task to retry.

How to reproduce

Enable opentelemetry in airflow.cfg:

otel_on = True
otel_host = breeze-otel-collector
otel_port = 4318
otel_prefix = airflow
otel_interval_milliseconds = 30000  # The interval between exports, defaults to 60000
otel_ssl_active = False

Run opentelemetry collector docker:

otel-collector:
    image: otel/opentelemetry-collector-contrib:0.70.0
    container_name: "breeze-otel-collector"
    command: [--config=/etc/otel-collector-config.yml]
    volumes:
      - ./otel-collector/otel-collector-config.yml:/etc/otel-collector-config.yml
      # - ./otel-collector/keys:/etc/keys
    ports:
      - "24318:4318"    # OTLP http receiver
      - "28889:8889"    # Prometheus exporter metrics

Operating System

Ubuntu 22.04.3 LTS

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 11 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Bisk1 commented 11 months ago

looks like some legacy metric names weren't added to the exemptions list https://github.com/apache/airflow/pull/30873/files#diff-1cca954ec0be1aaf2c212e718c004cb0902a96ac60043bf0c97a782dee52cc32R55

@ferruzzi It looks like those metrics were added to the codebase shortly before the exemptions list was created: https://github.com/apache/airflow/blob/8fdf3582c2967161dd794f7efb53691d092f0ce6/airflow/models/taskinstance.py#L2184

should we add them to the exemption list?

    r"^dag\.(?P<dag_id>.*)\.(?P<task_id>.*)\.queued_duration$",
    r"^dag\.(?P<dag_id>.*)\.(?P<task_id>.*)\.scheduled_duration$",

or remove their invocations?

alvaroserper commented 11 months ago

Okey thank you, this will be solved in 3.0 v ?

ferruzzi commented 11 months ago

should we add them to the exemption list?

Must have been some awkward timing, adding new non-compliant names should have been prevented by the unit tests.... Yeah, I guess in this case, let's add it to the exemption list. Can you cut the PR?

It has already been fixed and will be applied in the next release https://github.com/apache/airflow/pull/34531

ferruzzi commented 11 months ago

I know this isn't really an answer, but the root cause id that when you combine a long dag_id and a long task_id, the total length of the metric name exceeds OTel's max name length. A temporary workaround would be to use shorter names. Again: I know that's not a satisfactory solution, and the fix has already been applied to the next release, but if you want a temporary band-aid, that's one option.

ferruzzi commented 11 months ago

I just double-checked and this fix should be in Airflow 2.7.2 which is currently being voted on and should be out Very Soon :tm: ((next week I think, but don't hold me to that))

ferruzzi commented 11 months ago

2.7.2 was released this morning!