apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.11k stars 14.3k forks source link

2.6.1 Queued DagRun for some DAGs, and for some not #31687

Closed ibardarov-fms closed 1 year ago

ibardarov-fms commented 1 year ago

Apache Airflow version

2.6.1

What happened

We are running 40 dags of the same kind. All are started at 05:00. After upgrade to 2.6.1 sometimes randomly dags are not scheduled and there are no created dag-runs. Selection_448

What you think should happen instead

I would like to see a green column for the next period. If there is something failing I would expect to see an error or at least warning message somewhere.

How to reproduce

It happens after the upgrade from 2.3.2.

Operating System

Ubuntu

Versions of Apache Airflow Providers

❯ ./airflow.sh info
[+] Running 2/0
 ⠿ Container airflow-redis-1         Running                                                                                                                      0.0s
 ⠿ Container airflow-airflow-init-1  Created                                                                                                                      0.0s
[+] Running 2/2
 ⠿ Container airflow-redis-1         Healthy                                                                                                                      0.5s
 ⠿ Container airflow-airflow-init-1  Started                                                                                                                      1.1s
Apache Airflow
version                | 2.6.1                                                                                                                                         
executor               | CeleryExecutor                                                                                                                                
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler                                                                                           
sql_alchemy_conn       | postgresql+
                       | ow_us_east_1                                                                                                                                  
dags_folder            | /opt/airflow/dags                                                                                                                             
plugins_folder         | /opt/airflow/plugins                                                                                                                          
base_log_folder        | /opt/airflow/logs                                                                                                                             
remote_base_log_folder |                                                                                                                                               

System info
OS              | Linux                                                                                                                                                
architecture    | x86_64                                                                                                                                               
uname           | uname_result(system='Linux', node='da226ebb4b23', release='4.14.313-235.533.amzn2.x86_64', version='#1 SMP Tue Apr 25 15:24:19 UTC 2023',            
                | machine='x86_64', processor='')                                                                                                                      
locale          | ('en_US', 'UTF-8')                                                                                                                                   
python_version  | 3.7.16 (default, May  3 2023, 10:31:59)  [GCC 10.2.1 20210110]                                                                                       
python_location | /usr/local/bin/python                                                                                                                                

Tools info
git             | NOT AVAILABLE                                                                              
ssh             | OpenSSH_8.4p1 Debian-5+deb11u1, OpenSSL 1.1.1n  15 Mar 2022                                
kubectl         | NOT AVAILABLE                                                                              
gcloud          | NOT AVAILABLE                                                                              
cloud_sql_proxy | NOT AVAILABLE                                                                              
mysql           | mysql  Ver 8.0.33 for Linux on x86_64 (MySQL Community Server - GPL)                       
sqlite3         | 3.34.1 2021-01-20 14:10:07 10e20c0b43500cfb9bbc0eaa061c57514f715d87238f4d835880cd846b9ealt1
psql            | psql (PostgreSQL) 15.3 (Debian 15.3-1.pgdg110+1)                                           

Paths info
airflow_home    | /opt/airflow                                                                                                                                         
system_path     | /root/bin:/home/airflow/.local/bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin                                       
python_path     | /home/airflow/.local/bin:/usr/local/lib/python37.zip:/usr/local/lib/python3.7:/usr/local/lib/python3.7/lib-dynload:/home/airflow/.local/lib/python3.7
                | /site-packages:/usr/local/lib/python3.7/site-packages:/opt/airflow/dags:/opt/airflow/config:/opt/airflow/plugins                                     
airflow_on_path | True                                                                                                                                                 

Providers info
apache-airflow-providers-amazon          | 8.0.0 
apache-airflow-providers-celery          | 3.1.0 
apache-airflow-providers-cncf-kubernetes | 6.1.0 
apache-airflow-providers-common-sql      | 1.4.0 
apache-airflow-providers-docker          | 3.6.0 
apache-airflow-providers-elasticsearch   | 4.4.0 
apache-airflow-providers-ftp             | 3.3.1 
apache-airflow-providers-google          | 10.0.0
apache-airflow-providers-grpc            | 3.1.0 
apache-airflow-providers-hashicorp       | 3.3.1 
apache-airflow-providers-http            | 4.3.0 
apache-airflow-providers-imap            | 3.1.1 
apache-airflow-providers-microsoft-azure | 6.0.0 
apache-airflow-providers-mysql           | 5.0.0 
apache-airflow-providers-odbc            | 3.2.1 
apache-airflow-providers-postgres        | 5.4.0 
apache-airflow-providers-redis           | 3.1.0 
apache-airflow-providers-sendgrid        | 3.1.0 
apache-airflow-providers-sftp            | 4.2.4 
apache-airflow-providers-slack           | 7.2.0 
apache-airflow-providers-snowflake       | 4.0.5 
apache-airflow-providers-sqlite          | 3.3.2 
apache-airflow-providers-ssh             | 3.6.0 

Deployment

Docker-Compose

Deployment details

I ran airflow from docker-compose.

Anything else

When I manually pause and unpause the dag nothing happens. In the audit log there is no information of anyone trying to run the dag. In all the postgres tables there are no created entries/rows for the failing dag for the missing dates. There are no logs created for the missing days. There are no errors in the other log files.

I tried to allocate a lot of memory in a container and it works. I added swap file but it looks it has been never used. The tasks are running dbt

For dag processor I see from time to time some PID

File Path                                                                   PID  Runtime      # DAGs    # Errors  Last Runtime    Last Run
------------------------------------------------------------------------  -----  ---------  --------  ----------  --------------  -------------------
/opt/airflow/dags/insights/DAGNAME.py                       3710  0.53s             1           0  1.60s           2023-06-02T05:00:04
/opt/airflow/dags/insights/DAGNAME.py                       7023  0.00s             1           0  0.62s           2023-06-02T05:29:44
/opt/airflow/dags/insights/DAGNAME.py                       7095  0.01s             1           0  0.30s           2023-06-02T05:30:14
/opt/airflow/dags/insights/DAGNAME.py                      24199  0.02s             1           0  0.43s           2023-06-02T07:43:08
/opt/airflow/dags/insights/DAGNAME.py                      30931  0.00s             1           0  0.35s           2023-06-02T08:35:18
/opt/airflow/dags/insights/DAGNAME.py                       4938  0.01s             1           0  0.29s           2023-06-02T09:25:29
/opt/airflow/dags/insights/DAGNAME.py                      11316  0.46s             1           0  0.94s           2023-06-02T10:15:12
/opt/airflow/dags/insights/DAGNAME.py                      11377  0.01s             1           0  0.48s           2023-06-02T10:15:44
/opt/airflow/dags/insights/DAGNAME.py                      18417  0.01s             1           0  0.40s           2023-06-02T11:10:28
/opt/airflow/dags/insights/DAGNAME.py                      23364  0.01s             1           0  0.48s    

in the scheduler log i see nothing is schedulled at the expected time

[2023-06-02T04:59:32.427+0000] {logging_mixin.py:149} INFO - [2023-06-02T04:59:32.427+0000] {dag.py:3490} INFO - Setting next_dagrun for DAGNAME to 2023-06-02T05:00:00+00:00, run_after=2023-06-02T05:00:00+00:00
[2023-06-02T04:59:32.457+0000] {processor.py:179} INFO - Processing /opt/airflow/dags/insights/DAGNAME.py took 0.380 seconds
[2023-06-02T05:00:03.365+0000] {processor.py:157} INFO - Started process (PID=3583) to work on /opt/airflow/dags/insights/DAGNAME.py
[2023-06-02T05:00:03.372+0000] {processor.py:826} INFO - Processing file /opt/airflow/dags/insights/DAGNAME.py for tasks to queue
[2023-06-02T05:00:03.372+0000] {logging_mixin.py:149} INFO - [2023-06-02T05:00:03.372+0000] {dagbag.py:541} INFO - Filling up the DagBag from /opt/airflow/dags/insights/DAGNAME.py
[2023-06-02T05:00:04.315+0000] {processor.py:836} INFO - DAG(s) dict_keys(['DAGNAME']) retrieved from /opt/airflow/dags/insights/DAGNAME.py
[2023-06-02T05:00:04.430+0000] {logging_mixin.py:149} INFO - [2023-06-02T05:00:04.430+0000] {dag.py:2726} INFO - Sync 1 DAGs
[2023-06-02T05:00:04.611+0000] {logging_mixin.py:149} INFO - [2023-06-02T05:00:04.611+0000] {dag.py:3490} INFO - Setting next_dagrun for DAGNAME to 2023-06-03T05:00:00+00:00, run_after=2023-06-03T05:00:00+00:00
[2023-06-02T05:00:04.941+0000] {processor.py:179} INFO - Processing /opt/airflow/dags/insights/DAGNAME.py took 1.580 seconds
[2023-06-02T05:02:54.062+0000] {processor.py:157} INFO - Started process (PID=3710) to work on /opt/airflow/dags/insights/DAGNAME.py
[2023-06-02T05:02:54.094+0000] {processor.py:826} INFO - Processing file /opt/airflow/dags/insights/DAGNAME.py for tasks to queue
[2023-06-02T05:02:54.095+0000] {logging_mixin.py:149} INFO - [2023-06-02T05:02:54.094+0000] {dagbag.py:541} INFO - Filling up the DagBag from /opt/airflow/dags/insights/DAGNAME.py
[2023-06-02T05:02:57.813+0000] {processor.py:836} INFO - DAG(s) dict_keys(['DAGNAME']) retrieved from /opt/airflow/dags/insights/DAGNAME.py
[2023-06-02T05:02:59.602+0000] {logging_mixin.py:149} INFO - [2023-06-02T05:02:59.602+0000] {dag.py:2726} INFO - Sync 1 DAGs
[2023-06-02T05:03:00.611+0000] {logging_mixin.py:149} INFO - [2023-06-02T05:03:00.611+0000] {dag.py:3490} INFO - Setting next_dagrun for DAGNAME to 2023-06-03T05:00:00+00:00, run_after=2023-06-03T05:00:00+00:00
[2023-06-02T05:03:01.444+0000] {processor.py:179} INFO - Processing /opt/airflow/dags/insights/DAGNAME.py took 7.531 seconds
[2023-06-02T05:03:31.833+0000] {processor.py:157} INFO - Started process (PID=3777) to work on /opt/airflow/dags/insights/DAGNAME.py
[2023-06-02T05:03:31.835+0000] {processor.py:826} INFO - Processing file /opt/airflow/dags/insights/DAGNAME.py for tasks to queue

Are you willing to submit PR?

Code of Conduct

hussein-awala commented 1 year ago

Could you please provide an example of your DAG files? (contains the dag config, like the start date, schedule, catchup...)

ibardarov-fms commented 1 year ago

Here it is

from datetime import datetime, timedelta

from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.utils.trigger_rule import TriggerRule

from airflow import DAG

# Equals to the source/target schema
client_name = "apache_airflow"

default_args = {
    "owner": "insights",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

def notify_on_dag_failure(context):
    dag_run = context.get("dag_run")
    client_name = context["params"]["client_name"]
    dag_id = dag_run.dag_id
    reason = context["reason"]

    task_instances = dag_run.get_task_instances()
    failed_ti = [ti for ti in task_instances if ti.state == State.FAILED]
    failed_ti_ids = ", ".join([ti.task_id for ti in failed_ti])
    message = card_json(
        "fail",
        f"Failed {client_name}",
        f"{reason}: {failed_ti_ids}",
        client_name,
        dag_id,
    )
    print(message)

with DAG(
    f"test_{client_name}",
    schedule=CronTriggerTimetable("0 5 * * *", timezone="UTC"),  # https://crontab.guru/
    default_args=default_args,
    catchup=False,
    tags=["insights", "dbt", "snowflake", client_name],
    on_failure_callback=notify_on_dag_failure,
    max_active_runs=1,
    max_active_tasks=1,
    default_view="grid",
    start_date=datetime(2022, 2, 28),
    params={
        "client_name": client_name,
    },
) as dag:
    begin = DummyOperator(task_id="begin")
    end = DummyOperator(task_id="end", trigger_rule=TriggerRule.NONE_FAILED)

    standard_zone = DockerOperator(
        dag=dag,
        task_id="standard_zone",
        pool="snowflake",
        doc_md="documentation...",
        command=["/hello"],
        environment={},
        image="hello-world",
        api_version="auto",
        auto_remove=True,
        docker_conn_id="docker_default",
        docker_url="unix://var/run/docker.sock",
        network_mode="host",
        tty=True,
        mount_tmp_dir=False,
        do_xcom_push=False,
        force_pull=True,
        retries=3,
        max_retry_delay=timedelta(hours=4),
        retry_delay=timedelta(minutes=5),
        retry_exponential_backoff=True,
    )

    begin >> standard_zone >> end
hussein-awala commented 1 year ago

There might be a bug in the CronTriggerTimetable being used. Could you please try using schedule="0 5 * * *" instead?

Also you can try catchup=True with CronTriggerTimetable.

ibardarov-fms commented 1 year ago

I will enable today the catchup=True to see how it will go, and if it doesn't help, on tomorrow will run with catchup=True and the old syntax for cron.

ibardarov-fms commented 1 year ago

The catchup=True helped and all dags are running fine. 2 days in a row. I see that some of the dags are scheduled 14 seconds after the required time. The same is visible also with the old airflow versions without the catchup=True.

select dag_id, queued_at, start_date from dag_run 
WHERE 1=1
and execution_date::date = CURRENT_DATE - INTERVAL '0 day'
ORDER BY queued_at;
                   dag_id                   |           queued_at           |          start_date           
--------------------------------------------+-------------------------------+-------------------------------
 CLIENT_dag_name                            | 2023-06-21 05:00:00.429688+00 | 2023-06-21 05:00:00.600953+00
 CLIENT_dag_name                            | 2023-06-21 05:00:00.441562+00 | 2023-06-21 05:00:00.60247+00
 CLIENT_dag_name                            | 2023-06-21 05:00:00.457421+00 | 2023-06-21 05:00:00.604609+00
 CLIENT_dag_name                            | 2023-06-21 05:00:00.469522+00 | 2023-06-21 05:00:00.606059+00
 CLIENT_dag_name                            | 2023-06-21 05:00:00.48264+00  | 2023-06-21 05:00:00.612009+00
 CLIENT_dag_name                            | 2023-06-21 05:00:00.493514+00 | 2023-06-21 05:00:00.592331+00
 CLIENT_dag_name                            | 2023-06-21 05:00:00.522506+00 | 2023-06-21 05:00:00.599394+00
 CLIENT_dag_name                            | 2023-06-21 05:00:00.53555+00  | 2023-06-21 05:00:00.60757+00
 CLIENT_dag_name                            | 2023-06-21 05:00:00.550662+00 | 2023-06-21 05:00:00.609064+00
 CLIENT_dag_name                            | 2023-06-21 05:00:00.56266+00  | 2023-06-21 05:00:00.610524+00
 CLIENT_dag_name                            | 2023-06-21 05:00:02.051863+00 | 2023-06-21 05:00:02.228549+00
 CLIENT_dag_name                            | 2023-06-21 05:00:02.064021+00 | 2023-06-21 05:00:02.243757+00
 CLIENT_dag_name                            | 2023-06-21 05:00:02.079336+00 | 2023-06-21 05:00:02.230704+00
 CLIENT_dag_name                            | 2023-06-21 05:00:02.101663+00 | 2023-06-21 05:00:02.235086+00
 CLIENT_dag_name                            | 2023-06-21 05:00:02.115229+00 | 2023-06-21 05:00:02.233271+00
 CLIENT_dag_name                            | 2023-06-21 05:00:02.129137+00 | 2023-06-21 05:00:02.236793+00
 CLIENT_dag_name                            | 2023-06-21 05:00:02.14308+00  | 2023-06-21 05:00:02.238552+00
 CLIENT_dag_name                            | 2023-06-21 05:00:02.156731+00 | 2023-06-21 05:00:02.226757+00
 CLIENT_dag_name                            | 2023-06-21 05:00:02.175163+00 | 2023-06-21 05:00:02.240286+00
 CLIENT_dag_name                            | 2023-06-21 05:00:02.189773+00 | 2023-06-21 05:00:02.242026+00
 CLIENT_dag_name                            | 2023-06-21 05:00:04.341434+00 | 2023-06-21 05:00:04.539629+00
 CLIENT_dag_name                            | 2023-06-21 05:00:04.355116+00 | 2023-06-21 05:00:04.545596+00
 CLIENT_dag_name                            | 2023-06-21 05:00:04.369535+00 | 2023-06-21 05:00:04.547978+00
 CLIENT_dag_name                            | 2023-06-21 05:00:04.383337+00 | 2023-06-21 05:00:04.55093+00
 CLIENT_dag_name                            | 2023-06-21 05:00:04.39816+00  | 2023-06-21 05:00:04.553406+00
 CLIENT_dag_name                            | 2023-06-21 05:00:04.413037+00 | 2023-06-21 05:00:04.558538+00
 CLIENT_dag_name                            | 2023-06-21 05:00:04.437959+00 | 2023-06-21 05:00:04.561113+00
 CLIENT_dag_name                            | 2023-06-21 05:00:04.466875+00 | 2023-06-21 05:00:04.563797+00
 CLIENT_dag_name                            | 2023-06-21 05:00:04.482523+00 | 2023-06-21 05:00:04.566353+00
 CLIENT_dag_name                            | 2023-06-21 05:00:04.500369+00 | 2023-06-21 05:00:04.568963+00
 CLIENT_dag_name                            | 2023-06-21 05:00:14.449285+00 | 2023-06-21 05:00:14.836314+00
 CLIENT_dag_name                            | 2023-06-21 05:00:14.556828+00 | 2023-06-21 05:00:14.841634+00
 CLIENT_dag_name                            | 2023-06-21 05:00:14.633153+00 | 2023-06-21 05:00:14.845604+00
 CLIENT_dag_name                            | 2023-06-21 05:00:14.725238+00 | 2023-06-21 05:00:14.847512+00
(34 rows)
nathadfield commented 1 year ago

Possibly related to #27399?

ibardarov-fms commented 1 year ago

In this issue, airflow doesn't create runs for the whole days/runs - it acts like the dag is disabled and skips days/runs.

potiuk commented 1 year ago

In this issue, airflow doesn't create runs for the whole days/runs - it acts like the dag is disabled and skips days/runs.

I think that might still be related. Simply some subtle bug (like running the schedule precisely at the very moment it should be scheduled) might trigger it. For some reason you seem to have an installation where this behaviour seems to be easily reproducible, so maybe we can use it to narrow down the issue.

I think @hussein-awala was right it would be great if you could try to reproduce it with old expression and catchup = False. From what I understand above, catchup =True actully solves the problem. If we could know that also the old schedule does (independently from catchup = True), it could help to narrrow down the issue.

Also cc: @uranusjr -> It really looks like some edge-case i CronTriggerTimetable from the description and helpful experiments done by @ibardarov-fms . The 14 seconds delay in queue time shows that likely there might be a race condition that gets triggered somewhere by the timetable.

potiuk commented 1 year ago

cc: @uranusjr @hussein-awala .

I do not have yet the exact scenario in mind but looking at the "catchup" fixing the problem and the code of the triggerer, I have a possible candidate.

One of the best candidates I have this use of the _align_to_next method as it relies on exact equality AND the fact that it is using utcnow(). It is executed when catchup is False, and since it is using utcnow(), I believe it might be susceptible to behaving wrongly when you are "just before", or "just after" or even "just at" the interval edge.

if restriction.catchup:
      if last_automated_data_interval is not None:
          next_start_time = self._get_next(last_automated_data_interval.end)
      elif restriction.earliest is None:
          return None  # Don't know where to catch up from, give up.
      else:
          next_start_time = self._align_to_next(restriction.earliest)
  else:
      start_time_candidates = [self._align_to_next(DateTime.utcnow())]  # !!!!! <--- likely trigger of the problem
      if last_automated_data_interval is not None:
          start_time_candidates.append(self._get_next(last_automated_data_interval.end))
      if restriction.earliest is not None:
          start_time_candidates.append(self._align_to_next(restriction.earliest))
      next_start_time = max(start_time_candidates)
  if restriction.latest is not None and restriction.latest < next_start_time:
      return None
  return DagRunInfo.interval(next_start_time - self._interval, next_start_time)
ibardarov-fms commented 1 year ago

Will run it with catchup=False and the old cron.

I am using those environment variables/settings:

AIRFLOW__SCHEDULER_IDLE_SLEEP_TIME=1
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=30
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=30
ibardarov-fms commented 1 year ago

With catchup=False and the old cron way the problem is visible. From 30 dags, there were scheduled only 5. When I restored the new cron syntax with the catchup=True - airflow started and process the not scheduled dags.

uranusjr commented 1 year ago

I think the two are indeed related. The alignment implementation guess seems plausible, but I’m not able to come up with an example when it actually triggers a problem now (i.e. a failing test case).

potiuk commented 1 year ago

I believe #32921 might fix that one. @ibardarov-fms - is it possible that you apply the fix in your installation of Airflow and check it ?

ibardarov-fms commented 1 year ago

I have run it with the fix and it looks good. but I want to run it on more time without the fix to verify if the tasks will fail again.

ibardarov-fms commented 1 year ago

The fix is working. I run it two times and all dags were scheduled correctly and run.

I have to create new dags and need to make sure that with the new dags the problem will be visible. The 3rd run was without the fix Now I am sure that the fix is working because without it the new dags were not scheduled and run as described in this ticket.