apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.31k stars 14.09k forks source link

SlaMiss Records Never Created for Packaged DAGs #33410

Open tseruga opened 1 year ago

tseruga commented 1 year ago

Apache Airflow version

2.6.3

What happened

First and foremost, I understand that the current posture is that the currently implemented SLA mechanism is buggy and is being refactored heavily, but still wanted to call out this regression in case it's something that is resolvable by someone with a bit more knowledge of how this might've broken.

DAGs within Zip files (packaged DAGs, as defined here: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#packaging-dags) appear to never fire SLA Miss events in recent versions of Airflow (2.5.1+).

Our team recently upgraded from Airflow 2.5.1 to 2.6.3 and noticed that SLA misses were not being recorded.

What you think should happen instead

Airflow should treat packaged DAGs the same as non-zipped DAGs. SLA Miss records should be generated regardless of whether the DAG is in a zip file or not.

Given two DAGs with the exact same definition (aside from name) with one being zipped and the other not being zipped - we would expect both to fire off exactly the same number of SLA Miss events.

How to reproduce

  1. Create two DAGs, with exactly the same contents but different names to differentiate them.
    
    from datetime import datetime, timedelta

from airflow import DAG from airflow.operators.bash import BashOperator

with DAG( 'sla_test_zipped', # or 'sla_test_unzipped' schedule_interval='/2 *', start_date=datetime(2021, 1, 1), catchup=False, ) as dag:

successful_task = BashOperator(
    task_id='success_test',
    bash_command='sleep 30',
    sla=timedelta(seconds=10)
)

2. Compress one of the DAGs into a zip file, so the `dags/` directory contains two files - the same DAG twice with different names, one inside of a zip and the other as a `.py` file.
<img width="399" alt="image" src="https://github.com/apache/airflow/assets/5778047/8402d412-3039-4119-b3ce-5b81b8f834b4">

3. Run Airflow via breeze and enable both DAGs

4. Wait for several schedule intervals and observe the SLA Miss records through the UI

Results of testing this process:

For Airflow 2.5.1:
![image](https://github.com/apache/airflow/assets/5778047/bb480e71-aac4-4a07-b725-c94411679fac)

You can see that both DAGs fired off SLA Miss events

For Airflow 2.6.3:
![image](https://github.com/apache/airflow/assets/5778047/55fb7cdf-e0a6-4940-a134-6a559b2299fb)

Only the unzipped DAG fired off SLA Miss events

### Operating System

PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/"

### Versions of Apache Airflow Providers

apache-airflow-providers-common-sql==1.3.4 apache-airflow-providers-ftp==3.3.1 apache-airflow-providers-http==4.2.0 apache-airflow-providers-imap==3.1.1 apache-airflow-providers-sqlite==3.3.1



None are likely relevant.

### Deployment

Official Apache Airflow Helm Chart

### Deployment details

This was observed with both the official helm chart as well as locally using breeze for testing.

### Anything else

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
tseruga commented 1 year ago

After some more digging, I've narrowed down what's causing this. It appears as though only a partial key is being used to index into the _callback_to_execute dictionary maintained by the DagFileProcessorManager.

Instead of the full path, including both the zip file name as well as the python file within the zip - the dictionary is being indexed only by the path up to the zip file name.

E.g. Indexing into the callback dictionary uses /files/dags/example_sla_dag_zipped.zip when the dictionary in reality has a key of /files/dags/example_sla_dag_zipped.zip/example_sla_dag_zipped.py.

Will continue looking into what appears to be causing the mismatch

tseruga commented 1 year ago

A bit more digging, and I think this is the culprit (although my suggested fix may have unintended ramifications so I'd like to have some input from a more knowledgeable contributor). Obviously the intention with the code below was to avoid adding the filepaths to the queue, but this unintentionally broke SLAs for packaged DAGs (#30076 )

https://github.com/apache/airflow/blob/5b104a9020510c75f1d5a7a211fd0c7cb1b45070/airflow/dag_processing/manager.py#L722-L736

When this block is executed the full filepath (including the zip filename) is not added to the paths, and is thus never used to index into the callback dictionary that the above code is adding to.

I believe it should be something like this instead:

            self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
            self._callback_to_execute[request.full_filepath].append(request)
            self._add_paths_to_queue([request.full_filepath], True) # This line is added
            Stats.incr("dag_processing.sla_callback_count")

FWIW, I did add this line and things worked locally - SLA Misses were firing for DAGs within zip files

pankajkoti commented 1 year ago

cc: @uranusjr @potiuk guessing you might have an opinion here ā˜šŸ½

uranusjr commented 1 year ago

Looping in @argibbs (author of #30076 which added the change) for some potential thoughts.

tseruga commented 1 year ago

Any movement/triaging done on this? We effectively reworked the way that we handle SLAs to entirely sidestep the sla_miss table and manually calculate SLA miss events, but we'd prefer a more native solution (especially since this was an undocumented regression).

potiuk commented 1 year ago

I think itf @argibbs is not responding then it is up for grabs for anyone who would like to fix it. I will remove the needs-triage because apparently it's a real error and hopefully someone will take a look and fix it.

But the fastest way to fix it is that someone from your team will attempt to do so and provide pr @tseruga - since you are interested and have a way to reproduce, test - Airflow is developed by > 2600 contributors, so this is an easy way to become one.

argibbs commented 1 year ago

Hello, sorry, not deliberately ignoring people, just not checking my mail as often as I should.

(Insert real-life-getting-in-the-way comment here).

Haven't read the backlog yet so not sure what the problem is, but am very happy for someone else to fix the problem if they already have a good handle on it.

Otherwise will have a look when I get a chance. I will be on a brief break between jobs next month, so might have time then

Thanks, Andrew

On Tue, Sep 5, 2023, 16:51 Jarek Potiuk @.***> wrote:

I think itf @argibbs https://github.com/argibbs is not responding then it is up for grabs for anyone who would like to fix it. I will remove the needs-triage because apparently it's a real error and hopefully someone will take a look and fix it.

But the fastest way to fix it is that someone from your team will attempt to do so and provide pr - since you are interested and have a way to reproduce, test - Airflow is developed by > 2600 contributors, so this is an easy way to become one.

ā€” Reply to this email directly, view it on GitHub https://github.com/apache/airflow/issues/33410#issuecomment-1706882480, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABBTZ4XHQKRJAR7HSZIBBRLXY5DBTANCNFSM6AAAAAA3RHGM6I . You are receiving this because you were mentioned.Message ID: @.***>

argibbs commented 1 year ago

Ok,

Have managed to get to a real computer with a keyboard and everything and have a proper look.

First up, my apologies for breaking something, that was obviously not the intent.

So, comments:

  1. The proposed change self._add_paths_to_queue([request.full_filepath], True) is unfortunately problematic ... this would effectively undo the fix I made in the first place. You can read my PR + associated for more details, but basically SLA callbacks can fire so often that the file queue never drains, and the airflow scheduler effectively stops processing changes to dag files.
  2. So... what to do next? I'm still thinking on that one (but if you work it out, don't wait for me). There are a couple of options ... I don't yet understand how zipped dags are handled differently to standard dags. Assuming they're not too different, then I would say the fix is either: i) add the contents of the zips to the file path queue like we do normal dags (that's in prepare_file_path_queue at approx like 1200). Quite what this involves I don't know, I've never used zipped dags, and I'd have to experiment locally as part of the fix. ii) go further with my SLA fix. I did originally have a more involved solution (again if you look at my PR history, you can see what it was) that split the queues and effectively tracked which files had been processed. Then even if the SLA callbacks were spamming the queue, it wouldn't matter, because we'd ensure any dags which hadn't been refreshed after X seconds (by default I think it's 30 seconds or maybe 60) would then get priority and would be refreshed. SLA callback spam would make the system less responsive to updates, but it wouldn't stop ignoring them altogether. This change was originally abandoned because it was too big a change and people were understandably nervous, but maybe we could try again with as minimal an impl as possible. If we did that, then the proposed fix by @tseruga would be fine.

That said, while I think the impl in (2) would work, it does feel like one of those fix-the-symptoms-not-the-cause type things, and (1) is probably cleaner, for some value of clean.

As mentioned before, I will probably have an opportunity to look at this in October maybe. But no promises - if you can come up with a good solution before then, go for it. I'm happy to weigh in on PRs.

github-actions[bot] commented 1 week ago

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.