apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.74k stars 13.91k forks source link

Airflow ignores schedule dates from cron #23318

Closed meteoDaniel closed 2 years ago

meteoDaniel commented 2 years ago

Apache Airflow version

2.2.5 (latest released)

What happened

I have defined a DAG

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['info@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 10,
    'retry_delay': timedelta(minutes=10),
    'concurrency': 2
}
with DAG(
        'nwp_icon_eu_etl',
        default_args=default_args,
        description='Process NWP forecasts ICON-EU',
        schedule_interval='50 3,9,15,21 * * *',
        start_date=datetime.utcnow().replace(hour=0, minute=0),
        tags=['nwp_data'],
) as dag:
    download = DockerOperator(
        task_id='download',
        image='nwp_downloader_image:latest',
        auto_remove=True,
        api_version='1.41',
        mount_tmp_dir=False,
        pool_slots=4,
        container_name="task__download_icon_eu",
        mounts=[Mount(source=os.environ['NWP_ARCHIVE_MOUNT'],
                      target='/app/data', type='bind')],
        command="python3 /app/src/pipelines/run.py run_model_download 'icon_eu' "
                "'{{ (execution_date + macros.timedelta(hours=3)).strftime('%Y-%m-%dT%H:00:00+00:00') }}' ",
    )

And it should run 4 times a day, according to the defined cron 50 3,9,15,21 * * *

But airflow skips every second run.

Bildschirmfoto vom 2022-04-28 07-06-51

What you think should happen instead

I expected that Airflow take every schedule interval and not every second.

How to reproduce

import os
from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['info@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 10,
    'retry_delay': timedelta(minutes=10),
    'concurrency': 2
}
with DAG(
        'bash_task',
        default_args=default_args,
        schedule_interval='50 3,9,15,21 * * *',
        start_date=datetime.utcnow().replace(hour=0, minute=0),
        tags=['bash'],
) as dag:
    bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
    )
    bash_task

Operating System

Ubuntu 21.04

Versions of Apache Airflow Providers

My Dockerfile:

FROM apache/airflow:latest

RUN pip install --no-cache-dir apache-airflow[docker,amazon] boto3==1.21.45

Deployment

Docker-Compose

Deployment details

docker-compose:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:master-python3.8
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# AIRFLOW_GID                  - Group ID in Airflow containers
#                                Default: 50000
#
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
  &airflow-common
  image: airflow_image:latest
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CELERY__WORKER_AUTOSCALE: 24,20
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'
    AIRFLOW__CORE__PARALLELISM: 1000
    AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT: 999
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    AIRFLOW__WEBSERVER__WORKERS: 4
    AIRFLOW__WEBSERVER__WEB_SERVER_WORKER_TIMEOUT: 300
    AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: 60
    AIRFLOW__OPERATORS__DEFAULT_CPUS: 4
    AIRFLOW__OPERATORS__DEFAULT_DISK: 4096
    AIRFLOW__OPERATORS__DEFAULT_RAM: 4096
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    SATELLITE_ARCHIVE_MOUNT: ${SATELLITE_ARCHIVE_MOUNT}
    NWP_ARCHIVE_MOUNT: ${NWP_ARCHIVE_MOUNT}
    ECCODES_DEFINITION_PATH: ${ECCODES_DEFINITION_PATH}
    AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}

  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - /var/run/docker.sock:/var/run/docker.sock
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

volumes:
  postgres-db-volume:

Anything else

No

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

tirkarthi commented 2 years ago

Do you have catchup_by_default set to False since adding the job later in the day with catchup as False might result in missed run as below example in docs. Does this happen everyday where only 2 runs are executed?

https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#catchup

meteoDaniel commented 2 years ago

@tirkarthi I have tested both and it was the same every day.

The schedule from 9:50-15:50 and from 15:50 to 21:50 ran but the ones from 21:50 to 3:50 and from 3:50 to 9:50 not.

I have started airflow yesterday. So I expected that these two runs will be scheduled.

vanchaxy commented 2 years ago

Can you try to make start_date static instead of dynamic? I think it can make some trouble.

meteoDaniel commented 2 years ago

@themax087 I had the same idea 5 minutes ago on my way to bed 😅

I think the scheduler takes the Argument multiple times and each time it will interrupt schedule from 21:50-3:50 and 3:50-9:50 by setting the start to the current day.

eladkal commented 2 years ago

Please add reproduce example with static stat_date. dynamic start_date is a bad practice and a source for a lot of troubles. https://airflow.apache.org/docs/apache-airflow/stable/faq.html#dag-construction

meteoDaniel commented 2 years ago

@eladkal dynamic start_date was not my intention. In the past I had some trouble with catch up so I thought it would be a nice idea to have start_date always equal the day I update the project. So I ran into this issue.

My problem is solved so far.

BilashAmantay commented 1 year ago

I have the same issue even I'm using static hard-coded time for start_date. Cron based scheduling is not taking effect, instead, the job is running everyday midnight started from start_date.

vanchaxy commented 1 year ago

@BilashAmantay please, open separate issues with your code. Everyday midnight is default schedule, as far as I remember. Maybe, you are trying to pass wrong kwarg or smth