apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.13k stars 14.02k forks source link

max_active_runs = 1 can still create multiple active execution runs #9975

Closed match-gabeflores closed 3 years ago

match-gabeflores commented 4 years ago

Edit: There is a separate issue affecting max_active_runs in 1.10.14. That regression is fixed in 1.10.15.

Edit2: Version v2.1.3 contains some fixes but also contains bad regressions involving max_active_runs. Use v2.14 for the complete fixes to this issue

Edit3: Version 2.2.0 contains a fix for max_active_runs using dags triggercommand or TriggerDagRunOperator. https://github.com/apache/airflow/issues/18583

--

Apache Airflow version: 1.10.11, localExecutor

What happened:

I have max_active_runs = 1 in my dag file (which consists of multiple tasks) and I manually triggered a dag. While it was running, a second execution began under its scheduled time while the first execution was running.

I should note that the second execution is initially queued. It's only when the dag's 1st execution moves to the next task that the second execution actually starts.

My dag definition. The dag just contains tasks using pythonOperator.

dag = DAG(
    'dag1',
    default_args=default_args,
    description='xyz',
    schedule_interval=timedelta(hours=1),
    catchup=False,
    max_active_runs=1
)

What you expected to happen:

Only one execution should run. A second execution should be queued but not begin executing.

How to reproduce it: In my scenario:

  1. Manually trigger dag with multiple tasks.. have task1 take longer than the beginning of the next scheduled execution. (Dag Execution1). As an example, if the scheduled interval is 1 hour, have task1 take longer than 1 hour so as to queue up the second execution (Execution2).
  2. When task1 of Execution1 finishes and just before starting task2, the second execution (Execution2, which is already queued) begins running.

image

Anything else we need to know: I think the second execution begins in between the task1 and task2 of execution1. I think there's a few second delay there and maybe that's when Airflow thinks there's no dag execution? That's just a guess.

Btw, this can have potentially disastrous effects (errors, incomplete data without errors, etc)

boring-cyborg[bot] commented 4 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

mik-laj commented 4 years ago

The problem is, we don't have a state that describes DAG Run that are saved but not running. All DAG Run have running state initially. If we want to fix this bug we have to add a new dag state.

dinesh-procore commented 3 years ago

I am running into the exact same issue.

alechheb commented 3 years ago

The same issue here

ashb commented 3 years ago

Would someone be able to test if this specific case still happens on Airflow 2.0.0alpha1? (A few things about how we created DagRuns changed so this might have been fixed, but I didn't specifically set out to fix this.

ashb commented 3 years ago

Read the reproduction steps, and this bit sounds bang on:

I think the second execution begins in between the task1 and task2 of execution1. I think there's a few second delay there and maybe that's when Airflow thinks there's no dag execution? That's just a guess.

Yes, looking at the code that sounds right, and also hasn't changed in 2.0.0alpha1, the same logic is used.

natejenkins21 commented 3 years ago

Same issue here. Causing a lot of issues for my backfills...

kaxil commented 3 years ago

@natejenkins21 Can you provide reproduction steps please?

nathadfield commented 3 years ago

For what it's worth, this doesn't seem to be an issue on master but it certainly is on 2.0 installed from pip.

I created this DAG, switched it on and allowed it to catchup and then cleared the status of all tasks.

from airflow import models
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2020, 12, 10),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag_name = 'test_dag'

with models.DAG(dag_name,
                default_args=default_args,
                schedule_interval='0 0 * * *',
                catchup=True,
                max_active_runs=1
                ) as dag:

    start = BashOperator(
        task_id=f'start',
        bash_command='echo "Starting"; echo "Sleeping for 30 seconds"; sleep 30; echo "Finished"'
    )

As you can see, all the tasks are now running at the same time.

Screenshot 2020-12-22 at 11 16 44
nathadfield commented 3 years ago

Here's the same DAG running against master via breeze.

Screenshot 2020-12-22 at 11 35 21
kaxil commented 3 years ago

@nathadfield Does it occur without clearing tasks instances?

nathadfield commented 3 years ago

@nathadfield Does it occur without clearing tasks instances?

Yes. I removed catchup and triggered the DAG several times via the CLI and all of the tasks were running at the same time.

Screenshot 2020-12-23 at 09 03 39
kaxil commented 3 years ago

@nathadfield What metadata DB do you use?

Just wondering if it is related to https://github.com/apache/airflow/pull/13278

nathadfield commented 3 years ago

13278

@kaxil We're using Postgres so it's probably not related.

kaxil commented 3 years ago

Cool, I am looking at this today and tomorrow

kaxil commented 3 years ago

This happens in Master too but only happens when you manually trigger DAGs multiple time.

image

image

kamac commented 3 years ago

This also happens when triggering DAGs manually with create_dagrun. If the DagRun is created with state RUNNING, max_active_runs is not respected (the run isn't counted if you browse DAG's details page).

I've also tried creating the run with state SCHEDULED, but it's never run by the scheduler. (perhaps I'm doing it the wrong way - I'm still investigating that)

Here's the code for two DAGs to reproduce this: https://gist.github.com/kamac/7112af78f1a9004142903d4fe6e387d4

ashb commented 3 years ago

I've also tried creating the run with state SCHEDULED, but it's never run by the scheduler. (perhaps I'm doing it the wrong way - I'm still investigating that)

Right now the only states for Dag Runs are None, "running", or "failed" -- that's why the scheduler is never picking up that dag run.

kamac commented 3 years ago

@ashb I've tried creating DagRuns with state NONE. The tasks get run now (the last two instances), but max_active_runs is not respected.

Screenshot 2021-01-05 at 11 48 42Screenshot 2021-01-05 at 11 48 50

I'm on airflow 1.10.13, SequentialExecutor

kaxil commented 3 years ago

@ashb I've tried creating DagRuns with state NONE. The tasks get run now (the last two instances), but max_active_runs is not respected.

Screenshot 2021-01-05 at 11 48 42Screenshot 2021-01-05 at 11 48 50

I'm on airflow 1.10.13, SequentialExecutor

🤔 1.10.13 ?? Just noticed the original issue creator was on 1.10.11 so this is not a regression in 2.0 -- I though I read somewhere that it worked correctly for you @nathadfield on 1.10.x -- can you confirm please.

nathadfield commented 3 years ago

@kaxil It is definitely an issue on 1.10.14 as that is what we're running on our production system at the moment.

It does also seem to be an issue on 2.0 that I've got running on a locally running dev instance of our setup, but I could not replicate the problem on master via Breeze.

kaxil commented 3 years ago

@kaxil It is definitely an issue on 1.10.14 as that is what we're running on our production system at the moment.

It does also seem to be an issue on 2.0 that I've got running on a locally running dev instance of our setup, but I could not replicate the problem on master via Breeze.

I might ping you on Slack for more ad-hoc discussion, I was able to replicate it on Master too: https://github.com/apache/airflow/issues/9975#issuecomment-754171184

nathadfield commented 3 years ago

@kaxil Is this likely to only be fixed in 2.1 or might we also see it in a new 1.10 version for those people who are affected by this issue but cannot (or don't want to) move to 2.0 yet?

Adam-Leyshon commented 3 years ago

This issue is affecting 1.10.14.

Upgrading to 2.0 is not possible for us at this time since we have a number of internal blockers on plugins that we wrote that require refactoring.

We cleared a DAG run for 2019-01-01 and selected the all future tasks option, Now it want to processes over 200 runs at the same time

We did not experience this issue in 1.10.9.

Instead it is now trying to run a single task for each of the active runs in parallel instead of waiting for the first run to complete.

What I expect to happen is for it to clear the tasks and then complete each run sequentially, our system requires data be loaded in that order, having multiple parallel runs is causing huge issues for us.

image

image

kaxil commented 3 years ago

looks like max_active_runs was broken further in 1.10.13 -- This will be fixed in 1.10.15 by https://github.com/apache/airflow/pull/13803 . However like I mentioned in https://github.com/apache/airflow/issues/9975#issuecomment-764779460 not all use-cases will be fixed.

Complete fix will require adding a new state to DagRun -- 'queued' -- similar to Taskinstance

match-gabeflores commented 3 years ago

Thanks @kaxil . Before we close this, can we make sure the fix referenced by https://github.com/apache/airflow/issues/13802 actually fixes this issue with max_active_runs completely? @jsmodic references this broken fix https://github.com/apache/airflow/commit/cb8d53fbc64af8d6c175d0dda6ae51db65ccc19b but that commit occurred in Nov 2020, long after the release of v1.10.11 that I experienced the issue.

The original issue seemed to be sporadic (maybe a timing issue?) - See mik-laj and ash's comments above: https://github.com/apache/airflow/issues/9975#issuecomment-710150612

jsmodic commented 3 years ago

I think several issues are being conflated over this comment chain. My fix only addressed how max active runs doesn't work at all in 1.10.14 for manual dag runs like it did in 1.10.12.

I believe the OP issue of mixing manual and scheduled runs together is unrelated and still valid (in 1.10.x, maybe it is fixed in master).

kaxil commented 3 years ago

Yup, the PR I referenced fixes it for most users -- the one Ash and Kamil suggested has existed since a long time from 1.10.3 :)

kaxil commented 3 years ago

@gflores1023 Can you test with the fix in your environment if it fixes your specific case -- most likely not but worth checking as the commit like you mentioned was only added in 1.10.13

zachliu commented 3 years ago

ha, bumped into this issue in 2.0.1 i cleared 4 dag runs (4 execution dates) in a dag with max_active_runs = 1 and i ended up with 4 concurrent dag runs :crying_cat_face:

FredericoCoelhoNunes commented 3 years ago

@zachliu same problem here, and I have a huge backlog of ~200 failed runs to retry. Which means I have to go to the Airflow UI every couple of minutes to manually clear the failed runs, one at a time, because if I clear more than one they will execute simultaneously and both fail. (note: this is with a global value of max_active_runs_per_dag = 1)

zachliu commented 3 years ago

@FredericoCoelhoNunes this :bug: created a huge inconvenience in our workflow but luckily we have a stable API now, i just used my own python sdk to write backfill scripts that trigger only one dagrun at a time

FredericoCoelhoNunes commented 3 years ago

@zachliu Happy to hear you were able to create a stable API! Unfortunately I am not able to: when I use Python to select a failed DAG run, and clear all its task instances, for some reason it triggers some process in the scheduler and it gradually sets all other DAG runs to the "Running" state as well, over the course of a few minutes. Curiously, this doesn't happen if I clear the state of a single run in the UI.

Here's the code I was trying to use:

from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import clear_task_instances
from airflow import settings

session = settings.Session()

# Around 30
failed_dag_runs = DagRun.find(
    dag_id=DAG_ID,
    state='failed',
    session=session
)

for i, dag_run in enumerate(failed_dag_runs):
    run_id = dag_run.run_id
    task_instances = dag_run.get_task_instances(session=session)
    clear_task_instances(
        task_instances,
        session
    )
    session.commit()

    state = ""
    while state != 'success':
        <here I was checking the DAG run state, but even before it left this loop, it somehow clears the remaining DAG runs>

I think Airflow really wasn't developed with this sort of "low level" usage in mind, as I feel that its behavior is hard to predict. I guess I will stick with manually clearing the runs one by one until this problem is fixed.

zachliu commented 3 years ago

@FredericoCoelhoNunes i didn't create a stable API, i generated a Python client and use it to interact with the stable API:

from airflow_python_sdk.model.clear_task_instance import ClearTaskInstance

api_instance = dag_api.DAGApi(api_client)
clear_task_instance = ClearTaskInstance(
    dry_run=True,
    start_date=start_date,
    end_date=end_date,
    include_parentdag=False,
    include_subdags=False,
    only_failed=True,
    only_running=False,
    reset_dag_runs=True,
)
api_resp = api_instance.post_clear_task_instances(
    dag_id, clear_task_instance
)

i wouldn't recommend using the code like you showed there. It defeats the purpose of the stable API. Also, in your code, task_instances = dag_run.get_task_instances(session=session) will probably get all task instances including those from other dag runs.

mariana-s-fernandes commented 3 years ago

Same issue here (version 1.10.12)

jpmsilva commented 3 years ago

Hi there We are facing a similar issue: we have a DAG that runs to make a system consistent. We want to trigger this DAG via the stable REST API when certain things happen on a separate system. Because the DAG makes the system consistent, regardless when it runs, we want to have at most one instance running, and another queued. If there is a queued DAGRun, there is no point in queueing another, since the queued DAGRun will make the system eventually consistent. We were hoping to use max_active_runs to control how many DAGRuns for a DAG exist in the running state, but it looks like it doesn't work this way. The DAG in question is not scheduled: it is always meant to be triggered manually via the stable REST API. It looks like a new DAGRun state "queued" would be needed to achieve this, as mentioned previously.

wolfier commented 3 years ago

I couldn't find any mention of this AIRFLOW-2535.

This is an issue for TriggerDagRunOperators as well in 1.10.10 where max_active_runs was not respected.

PR #13803 does fix the issue in 1.10.15!

zachliu commented 3 years ago

@imsharadmishra are you sure? i did the same and i still have the issue :thinking:

imsharadmishra commented 3 years ago

@zachliu sorry, I face the same issue if I clear the state of finished dag runs. They execute concurrently as soon as the current dag run finishes. I got confused with a similar issue I was facing, where max_active_runs=1 was not working at all when I pass it through default_args.

iwanbolzern commented 3 years ago

Hi all, What's the current status of this issue? It seems as we are facing exactly this behavior. I will shortly try to explain our use case: We have a daily job to import new data and the last task of this job is to clear the dags from the previous 30 days to check if some data from the previous days has changed. And instead of processing day by day (as I would expected because of max_active_run=1) the tasks are scheduled randomly across all days. Unfortunately, I can't share the full DAG description but the image bellow shows the behavior pretty good:

image

Thank you for investigating into this!

zachliu commented 3 years ago

@iwanbolzern wow, this is an interesting way of dealing with data restatement :joy: in our case, a daily data ingestion always ingests the past 30 days of data and overwrites whatever that has already been there so that we are less dependent on a broken feature like this

iwanbolzern commented 3 years ago

@zachliu You're right it's an "interesting" but super handy way. This way, you can simply focus on writing the job for one day and because we do not have any inter day dependencies there is no need for having any for loops in your code 😉. And you also see on the first glance, for which day the restatement was successful.

zachliu commented 3 years ago

@iwanbolzern indeed very handy. i'll do it once this has been fixed :rocket:

ericbellet commented 3 years ago

I am facing the same issue when I cleared specifics steps. I found a kind of temporal solution for relaunching the previous steps and execute them without concurrency, you can Delete the DAG in the UI and turn on the DAG again. In my case, I had to remove temporally the steps that I don't want to rerun and just keep the ones that I need. Also, you can create another DAG for that. image

image

ldacey commented 3 years ago

This issue impacted me as well recently (I cleared 450 historical tasks to reprocess data)

Is there anyway to improve how the scheduler becomes deadlocked if you clear a lot of tasks? In my case on 2.0.2:

1) None of my DAGs would actually complete even if the tasks were all successful 2) Many of my DAGs would not start at all

The root cause of this was of course clearing 450 tasks all at once, but I had depends_on_past=True and concurrency=1 enabled and each tasks took 20 minutes to complete so I was stuck. This was exacerbated by all other DAGs failing to complete (the state would be running even if the tasks were complete). I ended up having to mark all of those DAGs successful in the UI, then I had to restart Airflow in order for DAG runs to be marked complete again.

I will refrain from clearing so many tasks at once next time, but perhaps Airflow could handle this situation better? Maybe a "queued" state at a DAG-level, and the DAG would only be considered running if one or more tasks were running? A deadlock is frustrating to deal with.

sergeple commented 3 years ago

Same on 2.0.1, I am using REST API to trigger dag, but max_active_runs is not working.

dimon222 commented 3 years ago

Any solution for this? Its significant problem that blocks me from upgrading from 1.10.15 to 2.x tree. Old "unstable" API didn't suffer from such.

nathadfield commented 3 years ago

@dimon222 Looks like it is on the 2.1.1 release milestone. I was hoping it would be fixed as part of 2.1 but we'll have to wait a bit longer.

zachliu commented 3 years ago

@nathadfield well, it was on the 2.0.1 milestone :joy:

fj-sanchez commented 3 years ago

Yeah, this is actually important for us also, it would be great to get this fixed ASAP. Also, currently it's the bug with the biggest amount of :+1: