apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.51k stars 14.14k forks source link

SLAs not working with ContinuousTimeTable #35481

Open hvignolo87 opened 10 months ago

hvignolo87 commented 10 months ago

Apache Airflow version

Airflow version: 2.6.3-python3.10

What happened

It seems that SLA is not checked when setting schedule="@continuous" in Airflow 2.6.3.

What you think should happen instead

SLA miss must be shown in the SLA Misses section, and the sla_miss_callback function triggered.

How to reproduce

Run this DAG:

from datetime import datetime, timedelta
from time import sleep

from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule

@dag(
    dag_id="sla_test_dag",
    description="DAG to test SLAs",
    default_args={},
    max_active_runs=1,
    schedule="@continuous",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=["test", "SLA"],
)
def sla_test_dag():
    """
    DAG to test SLAs behavior with different levels of SLAs.
    """

    @task.branch
    def choose_branch() -> str:
        return "task_1"

    @task
    def task_1() -> None:
        sleep(30)

    @task
    def task_2() -> None:
        sleep(30)

    @task(sla=timedelta(seconds=5), trigger_rule=TriggerRule.ALL_DONE)
    def sla_check():
        return None

    choose_branch() >> [task_1(), task_2()] >> sla_check()

sla_test_dag()

After some DagRuns, go to Browse/SLA Misses and verify that there's no SLA Miss registered. Also, yo can perform a similar check by creating a simple sla_miss_callback function, and verifying it's not been triggered.

Operating System

macOS Sonoma 14.0

Versions of Apache Airflow Providers

package_name description version
apache-airflow-providers-amazon Amazon integration (including Amazon Web Services (AWS) https://aws.amazon.com/) 8.2.0
apache-airflow-providers-celery Celery http://www.celeryproject.org/ 3.2.1
apache-airflow-providers-cncf-kubernetes Kubernetes https://kubernetes.io/ 7.1.0
apache-airflow-providers-common-sql Common SQL Provider https://en.wikipedia.org/wiki/SQL 1.5.2
apache-airflow-providers-docker Docker https://docs.docker.com/install/ 3.7.1
apache-airflow-providers-elasticsearch Elasticsearch https://www.elastic.co/elasticsearch 4.5.1
apache-airflow-providers-ftp File Transfer Protocol (FTP) https://tools.ietf.org/html/rfc114 3.4.2
apache-airflow-providers-google Google services including: 10.2.0
- Google Ads https://ads.google.com/
- Google Cloud (GCP) https://cloud.google.com/
- Google Firebase https://firebase.google.com/
- Google LevelDB https://github.com/google/leveldb/
- Google Marketing Platform https://marketingplatform.google.com/
- Google Workspace https://workspace.google.com/ (formerly Google Suite)
apache-airflow-providers-grpc gRPC https://grpc.io/ 3.2.1
apache-airflow-providers-hashicorp Hashicorp including Hashicorp Vault https://www.vaultproject.io/ 3.4.1
apache-airflow-providers-http Hypertext Transfer Protocol (HTTP) https://www.w3.org/Protocols/ 4.4.2
apache-airflow-providers-imap Internet Message Access Protocol (IMAP) https://tools.ietf.org/html/rfc3501 3.2.2
apache-airflow-providers-microsoft-azure Microsoft Azure https://azure.microsoft.com/ 6.1.2
apache-airflow-providers-mysql MySQL https://www.mysql.com/products/ 5.1.1
apache-airflow-providers-odbc ODBC https://github.com/mkleehammer/pyodbc/wiki 4.0.0
apache-airflow-providers-postgres PostgreSQL https://www.postgresql.org/ 5.5.1
apache-airflow-providers-redis Redis https://redis.io/ 3.2.1
apache-airflow-providers-sendgrid Sendgrid https://sendgrid.com/ 3.2.1
apache-airflow-providers-sftp SSH File Transfer Protocol (SFTP) https://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/ 4.3.1
apache-airflow-providers-slack Slack https://slack.com/ 7.3.1
apache-airflow-providers-snowflake Snowflake https://www.snowflake.com/ 4.2.0
apache-airflow-providers-sqlite SQLite https://www.sqlite.org/ 3.4.2
apache-airflow-providers-ssh Secure Shell (SSH) https://tools.ietf.org/html/rfc4251 3.7.

Deployment

Docker-Compose

Deployment details

docker-compose.yaml (SLA check is enabled)

version: "3.9"
name: airflow-local

x-airflow-common:
  &airflow-common
  image: airflow-custom-local:latest
  build:
    dockerfile: ./Dockerfile.local
    context: .
  env_file:
    - ./.env.airflow.local
  volumes:
    - ./dags:/opt/airflow/dags

x-airflow-depends-on:
  &airflow-depends-on
  depends_on:
    airflow-db-metadata:
      condition: service_healthy
    airflow-init:
      condition: service_completed_successfully

services:
  airflow-db-metadata:
    image: postgres:${POSTGRES_VERSION:-14.7}
    container_name: airflow-db-metadata
    hostname: airflow-db-metadata
    env_file:
      - ./.env.airflow.local
    volumes:
      - airflow-db-metadata-volume:/var/lib/postgresql/data
    ports:
      - ${POSTGRES_PORT_HOST:-5440}:${POSTGRES_PORT:-5432}
    restart: on-failure
    healthcheck:
      test: [ "CMD", "pg_isready", "-U", "${POSTGRES_USER:-airflow}" ]
      interval: 5s
      retries: 5

  airflow-scheduler:
    <<: [ *airflow-common, *airflow-depends-on ]
    container_name: airflow-scheduler
    command: scheduler
    restart: on-failure
    ports:
      - 8793:8793

  airflow-webserver:
    <<: [ *airflow-common, *airflow-depends-on ]
    container_name: airflow-webserver
    restart: on-failure
    command: webserver
    ports:
      - ${AIRFLOW_WEBSERVER_PORT_HOST:-8080}:8080
    healthcheck:
      test:
        [
          "CMD",
          "curl",
          "--fail",
          "http://localhost:${AIRFLOW_WEBSERVER_PORT_HOST:-8080}/health"
        ]
      interval: 30s
      timeout: 30s
      retries: 5

  airflow-init:
    <<: *airflow-common
    container_name: airflow-init
    entrypoint: /bin/bash
    user: 0:0
    command:
      - -c
      - |
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID:-0}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow db init

volumes:
  airflow-db-metadata-volume:

networks:
  default:
    name: airflow-local

Dockerfile.local 

FROM apache/airflow:2.6.3-python3.10

USER airflow

RUN --mount=type=cache,target=/home/airflow/.cache/pip \
    pip install --upgrade pip && \
    pip install --no-cache-dir \
        "apache-airflow[amazon,postgres,sentry,slack]==2.6.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt" \
        "apache-airflow==2.6.3" \
        "apache-airflow-providers-postgres" \
        "apache-airflow-providers-amazon" \
        "apache-airflow-providers-common-sql>=1.3.1" \
        "apache-airflow-providers-slack" \
        "cryptography" \
        "psycopg2-binary>=2.8.0" \
        "pandas" \
        "pymongo" \
        "pytz" \
        "blinker" \
        "smart_open" \
        "boto3"

Anything else

I've tested the callback function also, and it doesn't work either. If you change to a CRON schedule, both things works fine.

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 10 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

avkirilishin commented 7 months ago

The issue lies here: we aren't applying restrictions from a DAG when evaluating SLAs. This problem can occur not only in the ContinuousTimeTable but also in other timetables, such as OnceTimetable.

While it could be fixed, I believe it would be better to wait for the SLA refactoring in #32816.