apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.45k stars 14.36k forks source link

Airflow scheduler with statsd enabled crashes when dag_id contains unexpected characters #18010

Open mhaalme opened 3 years ago

mhaalme commented 3 years ago

Apache Airflow version

2.1.1

Operating System

Oracle Linux 7.9

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

The problem initially happened in a virtualenv installation of airflow 2.1.1 running as a systemd service.

I tried to reproduce it using docker-compose and airflow version 2.1.0; the problem occurs there as well.

What happened

A new DAG was added with the symbol "ö" in its dag_id. As soon as the DAG was triggered, the airflow scheduler died unexpectedly and could not be started again until the DAG was deleted from the UI and its dag_id changed in the DAG file.

What you expected to happen

The scheduler should continue running even if an exception occured when emitting metrics for a DAG run. An exception related to a single DAG run should not be able to kill the entire scheduler process.

Ideally, if a dag_id contains characters which are not allowed, the error should appear as soon as Airflow tries to parse it, and it should not be possible to schedule it at all.

It would also be nice if the error message showing all allowed characters displayed them in alphabetic order.

How to reproduce

  1. Start an instance of airflow with statsd metrics enabled
  2. Create a DAG with a dag_id containing symbols which are not allowed
  3. Trigger the DAG. The scheduler will shut down as soon as the DAG is triggered.

Anything else

The problem occurs only when AIRFLOWMETRICSSTATSD_ON=true . When statsd metrics are disabled, the DAG runs without problems.

Scheduler output with some values replaced with equivalent placeholders:

Sep 03 14:25:37 my.airflow.domain python[110341]: [2021-09-03 14:25:37,134] {{dagrun.py:444}} INFO - Marking run <DagRun other_dag @ 2021-09-03 11:24:34.7
60370+00:00: scheduled__2021-09-03T11:24:34.760370+00:00, externally triggered: False> successful
Sep 03 14:25:37 my.airflow.domain python[110341]: [2021-09-03 14:25:37,147] {{scheduler_job.py:1229}} INFO - Executor reports execution of other_dag.sched
uler_delay_to_zabbix execution_date=2021-09-03 11:24:34.760370+00:00 exited with status success for try_number 1
Sep 03 14:26:21 my.airflow.domain python[110341]: [2021-09-03 14:26:21,324] {{stats.py:235}} ERROR - Invalid stat name: dagrun.schedule_delay.Dag_With_ö.
Sep 03 14:26:21 my.airflow.domain python[110341]: Traceback (most recent call last):
Sep 03 14:26:21 my.airflow.domain python[110341]: File "/app01/venv/airflowvenv/lib64/python3.6/site-packages/airflow/stats.py", line 232, in wrapper
Sep 03 14:26:21 my.airflow.domain python[110341]: stat = handler_stat_name_func(stat)
Sep 03 14:26:21 my.airflow.domain python[110341]: File "/app01/venv/airflowvenv/lib64/python3.6/site-packages/airflow/stats.py", line 207, in stat_name_default_handler
Sep 03 14:26:21 my.airflow.domain python[110341]: stat_name=stat_name, allowed_characters=ALLOWED_CHARACTERS
Sep 03 14:26:21 my.airflow.domain python[110341]: airflow.exceptions.InvalidStatsNameException: The stat name (dagrun.schedule_delay.Dag_With_ö) has to be composed with characters in
Sep 03 14:26:21 my.airflow.domain python[110341]: {'0', 'f', 'k', 'p', 'J', 'v', '7', 'T', 'V', 'n', 'r', 'W', 'X', 'L', 'Q', 't', '_', 'u', '-', 'c', 'h', '3', 'm', '6', 'l', 'S', 'C', 'a', 'F', 'G', 'x', 'b', 'K', '8', 'j', 'w', 'D', 'g', '1', '4', 'q', '5', 'e', 'i', 'M', 'P', 'R', 'U', 'I', 'Z', '.', 'A', 'O', '9', 'y', '2', 's', 'E', 'Y', 'z', 'B', 'H', 'd', 'N', 'o'}.
Sep 03 14:26:21 my.airflow.domain python[110341]: [2021-09-03 14:26:21,408] {{stats.py:235}} ERROR - Invalid stat name: dagrun.dependency-check.Dag_With_ö.
Sep 03 14:26:21 my.airflow.domain python[110341]: Traceback (most recent call last):
Sep 03 14:26:21 my.airflow.domain python[110341]: File "/app01/venv/airflowvenv/lib64/python3.6/site-packages/airflow/stats.py", line 232, in wrapper
Sep 03 14:26:21 my.airflow.domain python[110341]: stat = handler_stat_name_func(stat)
Sep 03 14:26:21 my.airflow.domain python[110341]: File "/app01/venv/airflowvenv/lib64/python3.6/site-packages/airflow/stats.py", line 207, in stat_name_default_handler
Sep 03 14:26:21 my.airflow.domain python[110341]: stat_name=stat_name, allowed_characters=ALLOWED_CHARACTERS
Sep 03 14:26:21 my.airflow.domain python[110341]: airflow.exceptions.InvalidStatsNameException: The stat name (dagrun.dependency-check.Dag_With_ö) has to be composed with characters in
Sep 03 14:26:21 my.airflow.domain python[110341]: {'0', 'f', 'k', 'p', 'J', 'v', '7', 'T', 'V', 'n', 'r', 'W', 'X', 'L', 'Q', 't', '_', 'u', '-', 'c', 'h', '3', 'm', '6', 'l', 'S', 'C', 'a', 'F', 'G', 'x', 'b', 'K', '8', 'j', 'w', 'D', 'g', '1', '4', 'q', '5', 'e', 'i', 'M', 'P', 'R', 'U', 'I', 'Z', '.', 'A', 'O', '9', 'y', '2', 's', 'E', 'Y', 'z', 'B', 'H', 'd', 'N', 'o'}.
Sep 03 14:26:21 my.airflow.domain python[110341]: [2021-09-03 14:26:21,409] {{scheduler_job.py:1319}} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Sep 03 14:26:21 my.airflow.domain python[110341]: Traceback (most recent call last):
Sep 03 14:26:21 my.airflow.domain python[110341]: File "/app01/venv/airflowvenv/lib64/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1303, in _execute
Sep 03 14:26:21 my.airflow.domain python[110341]: self._run_scheduler_loop()
Sep 03 14:26:21 my.airflow.domain python[110341]: File "/app01/venv/airflowvenv/lib64/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1396, in _run_scheduler_loop
Sep 03 14:26:21 my.airflow.domain python[110341]: num_queued_tis = self._do_scheduling(session)
Sep 03 14:26:21 my.airflow.domain python[110341]: File "/app01/venv/airflowvenv/lib64/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1535, in _do_scheduling
Sep 03 14:26:21 my.airflow.domain python[110341]: self._schedule_dag_run(dag_run, active_runs_by_dag_id.get(dag_run.dag_id, set()), session)
Sep 03 14:26:21 my.airflow.domain python[110341]: File "/app01/venv/airflowvenv/lib64/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1765, in _schedule_dag_run
Sep 03 14:26:21 my.airflow.domain python[110341]: schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
Sep 03 14:26:21 my.airflow.domain python[110341]: File "/app01/venv/airflowvenv/lib64/python3.6/site-packages/airflow/utils/session.py", line 67, in wrapper
Sep 03 14:26:21 my.airflow.domain python[110341]: return func(*args, **kwargs)
Sep 03 14:26:21 my.airflow.domain python[110341]: File "/app01/venv/airflowvenv/lib64/python3.6/site-packages/airflow/models/dagrun.py", line 403, in update_state
Sep 03 14:26:21 my.airflow.domain python[110341]: with Stats.timer(f"dagrun.dependency-check.{self.dag_id}"):
Sep 03 14:26:21 my.airflow.domain python[110341]: AttributeError: __enter__
Sep 03 14:26:21 my.airflow.domain python[110341]: [2021-09-03 14:26:21,457] {{local_executor.py:387}} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
Sep 03 14:26:22 my.airflow.domain python[110341]: [2021-09-03 14:26:22,561] {{process_utils.py:100}} INFO - Sending Signals.SIGTERM to GPID 8988
Sep 03 14:26:23 my.airflow.domain python[110341]: [2021-09-03 14:26:23,178] {{process_utils.py:66}} INFO - Process psutil.Process(pid=8988, status='terminated', exitcode=0, started='2021-08-17 15:46:22') (8988) terminated with exit code 0
Sep 03 14:26:23 my.airflow.domain python[110341]: [2021-09-03 14:26:23,179] {{scheduler_job.py:1330}} INFO - Exited execute loop

Are you willing to submit PR?

Code of Conduct

eladkal commented 3 years ago

Duplicate of https://github.com/apache/airflow/issues/8634

mhaalme commented 3 years ago

Duplicate of #8634

I don't think this counts as a duplicate as we're talking about different components, different versions and different errors. In my case (version 2.1.1) the webserver does not seem to be affected by the non-ascii characters at all and runs just fine. It is the scheduler that crashes.

potiuk commented 3 years ago

Agree it's likely something we should address. I am just wondering why it did not fail at trying to insert DAG to the DB - the DAG_ID there should only allow ASCII.

eladkal commented 3 years ago

Agree it's likely something we should address. I am just wondering why it did not fail at trying to insert DAG to the DB - the DAG_ID there should only allow ASCII.

I was able to create both dag_id and task_id with non ascii char:

from airflow import DAG
from datetime import datetime
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2020, 3, 11),
}

with DAG(
    dag_id='simple_non_ascii_ñ_ö',
    default_args=default_args,
    schedule_interval='8/15 * * * *',
) as dag:
    BashOperator(
        task_id='sleep_ñ_ö',
        bash_command='sleep 5',
    )

Screen Shot 2021-09-03 at 22 56 27

However I wasn't able to create with ♥O◘♦♥O◘♦: Screen Shot 2021-09-03 at 23 42 10

We use validate_key:

https://github.com/apache/airflow/blob/867e9305f08bf9580f25430d8b6e84071c59f9e6/airflow/utils/helpers.py#L45-L55 The regex is: https://github.com/apache/airflow/blob/867e9305f08bf9580f25430d8b6e84071c59f9e6/airflow/utils/helpers.py#L37

validation is on both the dag_id and task_id: https://github.com/apache/airflow/blob/867e9305f08bf9580f25430d8b6e84071c59f9e6/airflow/models/dag.py#L328

https://github.com/apache/airflow/blob/867e9305f08bf9580f25430d8b6e84071c59f9e6/airflow/models/baseoperator.py#L538

A possible fix is: KEY_REGEX = re.compile(r'^[\w.-]+$', re.ASCII)

Some examples:

import re
KEY_REGEX = re.compile(r'^[\w.-]+$')
KEY_REGEX_NEW = re.compile(r'^[\w.-]+$', re.ASCII)

strings = ["ascii_a_b", "ascii_ñ_ö", "♥O◘♦♥O◘♦"]

for string in strings:
    print(f"\nresult for {string}:")
    print(KEY_REGEX.match(string))
    print(KEY_REGEX_NEW.match(string))

gives:

result for ascii_a_b:
<_sre.SRE_Match object; span=(0, 9), match='ascii_a_b'>
<_sre.SRE_Match object; span=(0, 9), match='ascii_a_b'>

result for ascii_ñ_ö:
<_sre.SRE_Match object; span=(0, 9), match='ascii_ñ_ö'>
None

result for ♥O◘♦♥O◘♦:
None
None

adding re.ASCII will show broken dag as expected: Screen Shot 2021-09-03 at 23 56 05

@potiuk WDYT?

potiuk commented 3 years ago

For now - yeah. The problem is that DAG_ID and TASK_ID are also used as descriptions in the UI. And I perfectly understand why someone would like to use the national characters for them.

So maybe we should either a) allow to use non-ASCII and fix all the problems related or b) add some non-ascii description. We had a chat about it with @ashb recently and I think we both lean towards allowing non-ASCII for IDs.

This likely means than in cases like 'statsd' we will have to slugify the IDs to convert them in ASCII-only, My Favourite library that does great job on it is https://github.com/mozilla/unicode-slugify - which I think we should use for it.

What really convinced me is the:

# Allow some extra chars
slugify(u'北京 (capital of China)', ok=SLUG_OK+'()', only_ascii=True)
# u'bei-jing-(capital-of-china)'

I am just not sure if it will be too risky to try to do it in 2.2.

uranusjr commented 3 years ago

Regarding unicode-slugify, https://github.com/un33k/python-slugify (which is already a dependency) also supports unicode slugification, so we don’t need to add a new dependency. But yeah it’s probably a bit risky to include this in 2.2.

Melodie97 commented 3 years ago

Hi @potiuk, what have we resolved to do about this issue, I see you added the telemetry label to it

potiuk commented 3 years ago

Just added it to label the issues related to telemetry :). U think nothing special changed - I think no-one was working on that issue yet. It's free to work on :)

Melodie97 commented 3 years ago

@potiuk, based on the discussions had concerning this issue, should I go ahead to: a) allow to use non-ASCII and fix all the problems related or b) add some non-ASCII description

And do you still think it'll be too risky to try to do the former in 2.2?

potiuk commented 3 years ago

Good points @Melodie97 :)

I think a) and using python-slugify is the way to go.

Anything that we do in main is by default now 2.3 (we cut-off the 2.2 branch when we released 2.2.0 few weeks ago) so we need to specifically cherry-pick changes to 2.2 branch if we want to release a change in 2.2 line. So this problem is gone now.

Melodie97 commented 3 years ago

Hi @potiuk, I've created a pull request for this issue, pls review

deeproker commented 1 year ago

MWAA is stuck with stat exception , upon updating its not working anymore as start and stop of scheduler is managed by AWS. anyone has any solution to get the airflow up again in AWS managed airflow instance ?

uranusjr commented 1 year ago

I’m pondering what the correct fix is here. KEY_REGEX (and GROUP_KEY_REGEX) allowing non-ASCII characters is arguably unintended, since those regex patterns are from way back since Python 2, when re.ASCII was the default. \w allowing non-ASCII (Unicode mode) was only default on Python 3. So we could potentially declare the current behaviour as a bug, and simply add re.ASCII to those patterns and break any DAGs using non-ASCII IDs. But on the other hand, I do sympathise users already using non-ASCII charahcters, and it may also be viable to only disallow non-ASCII when statsd is enabled. Thoughts?

potiuk commented 1 year ago

Should be part of discussion here: https://lists.apache.org/thread/96tco8dfs4mh12kqm1pwjhm5mqr54qbm . I think we should limit ID to ASCII and add extra description.

cbuffett commented 8 months ago

I'm seeing a very similar error when the stat name is too long (more than 250 characters) in Airflow 2.7.2 (masked due to sensitive data)

[2024-03-08, 13:49:12 UTC] {validators.py:101} ERROR - Invalid stat name: ***************************************************************************************************************************************************************************************************************************************************************.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode, session=session)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1642, in _execute_task_with_callbacks
    with Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration", tags=self.stats_tags):
AttributeError: __enter__
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/metrics/validators.py", line 98, in wrapper
    stat = handler_stat_name_func(stat)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/metrics/validators.py", line 181, in stat_name_default_handler
    raise InvalidStatsNameException(
airflow.exceptions.InvalidStatsNameException: The stat_name (***************************************************************************************************************************************************************************************************************************************************************) has to be less than 250 characters.

What's interesting is that I see the stat name too long exception multiple times in the logs, but only this instance results in the AttributeError.

Edit: I believe this is specifically due to the fact that when the InvalidStatsNameException is handled, None is returned to the with Stats.timer() call, which results in the AttributeError. This is easy enough to reproduce using the following:

with None:
    print("test")

Traceback (most recent call last):
  File "C:\Program Files\Python38\lib\site-packages\IPython\core\interactiveshell.py", line 3397, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-2-26e7b0306529>", line 1, in <cell line: 1>
    with None:
AttributeError: __enter__

Could wrap all instances of with Stats.timer() in a try/except/else block to specifically handle this error, though that would prevent the timed code from executing in the event of the exception, i.e.,

try:
    timer = Stats.timer()
except AttributeError:
    pass
else:
    with timer:

Could just manually start/stop the timer if it's successfully created, but this would remove the convenience of the with statement.

uranusjr commented 8 months ago

With #38446 I think there’s even less incentive to allow non-ASCII. The ML thread never got resolved, but I think no-one was really against restricting the IDs to ASCII-only?

potiuk commented 8 months ago

With https://github.com/apache/airflow/pull/38446 I think there’s even less incentive to allow non-ASCII. The ML thread never got resolved, but I think no-one was really against restricting the IDs to ASCII-only?

Yes. We should make it ASCII only. I guess initially with warnings and later we should likely just forbid it.

Anriod commented 2 months ago

Subject: Request to add display_name for Task Groups

Hello, Airflow team!

I would like to propose adding the ability to use a display_name field for Task Groups, similar to what is already implemented for DAGs and tasks. This would allow users to set localized names for task groups in the UI without needing to use only ASCII for group_id.

For example, using Cyrillic characters in task names leads to errors such as: ERROR - Invalid stat name: dag.PP890_FULL_NEW.Получение_НСИ.partner.create_partner.queued_duration.

Adding a display_name field for Task Groups would greatly improve usability with non-ASCII characters.

Thank you for your consideration!