apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Airflow progressive slowness #32928

Open llamageddon83 opened 1 year ago

llamageddon83 commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

We are running Airflow on EKS with version 2.5.3. Airflow has been experiencing progressive slowness over a period of 2-3 weeks where DAGs start getting queued without ever executing and leads us to restart the scheduler pod. After the pod restart, problem goes away for a few days and then starts to slowly creep back up.

The pods, the logs and the dashboards all look healthy, the UI shows that no tasks are currently running, and that there are no worker pods alive. The resource usage graphs (CPU, memory) also look what they should if no DAGs are actually executing.

During one such outage, we disabled all the DAGs and marked all the tasks as success just to see if scheduler is able to spin up new worker pods. Scheduler never recovered and we restarted the scheduler pod.

However, there is one dashboard that shows metrics named Executor running tasks and Executor open slots. We noticed that this dashboard was accurately representing the slowness behavior. Over a period of time, number of open slots would decrease and vice versa for running tasks. These two would never reset even when nothing is running during a long period of time which is every day between 10:00 PM to 8:00 AM.

These metrics are coming from base_exeuctor :

        Stats.gauge("executor.open_slots", open_slots)
        Stats.gauge("executor.queued_tasks", num_queued_tasks)
        Stats.gauge("executor.running_tasks", num_running_tasks)

and num_running_tasks is defined as num_running_tasks = len(self.running) in base_executor.

Screenshot 2023-07-28 at 3 11 30 PM

So we enabled some logs from KuberenetesExecutor under this method to see what was in self.running:

    def sync(self) -> None:
        """Synchronize task state."""
      ####
        if self.running:
            self.log.debug("self.running: %s", self.running)  #--> this log
       ###
        self.kube_scheduler.sync()

where self.running is defined as self.running: set[TaskInstanceKey] = set(). The log showed that somehow the tasks that have been completed successfully in the past still exist in self.running. For example, a snippet of the log outputted on the 28th is holding on to the tasks that have already been successfully completed on the 24th and 27th:

**time: Jul 28, 2023 @ 15:07:01.784**
self.running: {TaskInstanceKey(dag_id='flight_history.py', task_id='load_file', run_id=**'manual__2023-07-24T01:06:18+00:00'**, try_number=1, map_index=17), TaskInstanceKey(dag_id='emd_load.py', task_id='processing.emd', run_id='**scheduled__2023-07-25T07:30:00+00:00'**, try_number=1, map_index=-1), 

We validated that these tasks have been completed without any issue from the UI and Postgres DB (which we use as the metadata backend).

Once the scheduler pod is restarted, the problem goes away, the metrics in Grafana dashboard reset and tasks start executing.

What you think should happen instead

Airflow's scheduler is keeping a track of currently running tasks and their state in memory. And that state in some cases is not getting cleared. The tasks that have been completed should eventually be cleared from running set in KubernetesExecutor once the worker pod exits.

How to reproduce

Beats me. Our initial assumption was that that is a DAG implementation issue and some particular DAG is misbehaving. But this problem has occurred with all sorts of DAGs, happens for scheduled and manual runs, and is sporadic. Tt here is some edge scenario that causes this to happen. But we are unable to nail it down any further.

Operating System

Debian GNU/ Linux 11 (bullseye)

Versions of Apache Airflow Providers

aiofiles==23.1.0 aiohttp==3.8.4 airflow-dbt>=0.4.0 airflow-exporter==1.5.3 anytree==2.8.0 apache-airflow-providers-ftp==2.0.1 apache-airflow-providers-http>=2.0.3 apache-airflow-providers-microsoft-mssql==2.1.3 apache-airflow-providers-snowflake>=4.0.4 apache-airflow-providers-hashicorp==3.3.0 apache-airflow-providers-cncf-kubernetes==5.2.2 apache-airflow>=2.2.3 asgiref==3.5.0 Authlib==0.15.5 dbt-snowflake==1.5.2 flatdict==4.0.1 hvac==0.11.2 jsonschema>=4.17.3 pandas==1.3.5 psycopg2-binary==2.9.3 pyOpenSSL==23.1.1 pysftp==0.2.9 pysmbclient==0.1.5 python-gnupg==0.5.0 PyYAML~=5.4.1 requests~=2.26.0 smbprotocol==1.9.0 snowflake-connector-python== 3.0.4 snowflake-sqlalchemy==1.4.7 statsd==3.3.0 py7zr==0.20.5

Deployment

Official Apache Airflow Helm Chart

Deployment details

Airflow is deployed via helm charts on EKS in AWS. There are two scheduler pods with AIRFLOW__CORE__PARALLELISM set to 10.

Anything else

N/A

Are you willing to submit PR?

Code of Conduct

ephraimbuddy commented 1 year ago

We have had a minor release since 2.5.3, could you try this with Airflow 2.6.3?

llamageddon83 commented 1 year ago

Hi @ephraimbuddy thank you for taking a look at this. Yes I will bump the version and see if the issue persists. It will take about 2-3 weeks to push the change through to production.

llamageddon83 commented 1 year ago

Hi @ephraimbuddy. We pushed 2.6.3 to prod last week. And the progressive slowness has started to creep up again. Version bump didn't seem to remedy the issue.

potiuk commented 1 year ago

Is it possible that you do check which tasks are in "running" state for some time when it happens and maybe try to find some correlated logs from scheduler and task executtion - and ideally also find the logs from another task instance of the same task that was not found in "running" so that we could compare them and see the difference?

I preseume that there is some kind of race happenin that will somehow skip removal of the task even if it has been completed. - but seeing the logs and comparing them could narrow down the searrch an might allow us to come up with a plausible hypothesis.

Question - do you run deferrable tasks / triggerer ? Maybe somehow there is a problem with deferrable code ? (wild guessing now after looking into some possible paths).

llamageddon83 commented 1 year ago

Hi @potiuk Please find my answers in line below:

Is it possible that you do check which tasks are in "running" state for some time when it happens and maybe try to find some correlated logs from scheduler and task executtion - and ideally also find the logs from another task instance of the same task that was not found in "running" so that we could compare them and see the difference?

Yes we have spent quite a bit of time trying to compare logs from different tasks, logs of same tasks from different executions. But there is no oddity we could spot. As far as running state goes, since this happens very sporadically, we don't know what happens to the task state. Whenever we take a look, the tasks have completed and are no longer in the running state.

I preseume that there is some kind of race happenin that will somehow skip removal of the task even if it has been completed. - but seeing the logs and comparing them could narrow down the searrch an might allow us to come up with a plausible hypothesis.

Sorry I wish I could help. We have spent weeks trying to narrow it down to some DAG implementation, some kind of pattern we may have.. but we got nothing..

Question - do you run deferrable tasks / triggerer ? Maybe somehow there is a problem with deferrable code ? (wild guessing now after looking into some possible paths).

Yes, we have quite a few of them. The deferrable tasks run in a never ending loop. I know you are very busy and go through many discussions, but if you remember, I previously mentioned how we use triggers for streaming here. The triggers run in a never ending loop and they poll an HTTP endpoint for messages. The schedule for the triggers is stacked and they ramp up to their max_active_runs. Another thing I should point out that the tasks that do not get removed don't necessarily have any deferrable operators.

Thank you

patrick-shakudo commented 1 year ago

I am having the same issue with airflow 2.6.3. Database and airflow ui shows no tasks are running but the kubernetes executor thinks there are 64 (parallelism=64) running tasks and skips over queuing / running any more tasks.

My initial thoughts on this is it appears that within the kubernetes_executor.py the process_watcher_task sometimes fails to add a finished pod to the result_queue and because of this _change_state is never called on the task instance and thus the task instance is never removed from self.running set of running task instances and in turn the "critical section" of queueing tasks is never attempted.

I have posted more details about it here in the airflow slack: https://apache-airflow.slack.com/archives/CCQ7EGB1P/p1694033145572849

Cricktom commented 1 year ago

I am also facing the same issue with airflow 2.6.2. Airflow ui shows no tasks are running but the Celery executor open slots are stuck to zero whereas running slots are stuck at 32 which results skip over queuing/running any more tasks.

AIRFLOWCOREPARALLELISM is 32 and the number of workers running is 2 with having AIRFLOWCELERYWORKER_CONCURRENCY value 8

I'm not really sure how to reproduce it, because it happens over a period of time, and that too sporadically.

ephraimbuddy commented 1 year ago

I am also facing the same issue with airflow 2.6.2. Airflow ui shows no tasks are running but the Celery executor open slots are stuck to zero whereas running slots are stuck at 32 which results skip over queuing/running any more tasks.

AIRFLOWCOREPARALLELISM is 32 and the number of workers running is 2 with having AIRFLOWCELERYWORKER_CONCURRENCY value 8

I'm not really sure how to reproduce it, because it happens over a period of time, and that too sporadically.

Looks like you have a different issue, can you create a separate issue with more information?

Cricktom commented 1 year ago

Thank you @ephraimbuddy for taking a look at this. Here are more details that support the same behavior as mentioned in this bug.

The below image shows the number of tasks running for the celery executor. It slowly goes up and when it reaches the limit, sometimes it doesn't come down until we restart the scheduler. image

The below image shows the number of open slots for the celery executor, which follows the same pattern as above because it's inversely proportional to the number of running tasks.

image

The below image shows the number of tasks running per pool in the airflow webserver UI. Here we can see that the number of running tasks are not in proportion with the celery executor slots and we have observed that after a point Airflow stops scheduling any task at all because it doesn't find any open slot of the executor. image

To mitigate this issue, we are restarting the scheduler every hour. Please suggest if we can follow any better approach to overcome the same.

adrianrego commented 1 year ago

I'm also seeing this issue on our setup...Airflow 2.7.1, KubernetesExecutor. Running 3 scheduler pods. Looks like things start going downhill after 5 days. Restarting the schedulers gets things moving again.

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

sa2413 commented 1 year ago

any update on this issue? facing this on multiple k8s clusters

potiuk commented 1 year ago

No, but if someone would like to spend time on trying to analyse it it would be great. Until then - if you experience it, I recommend restarting airflow periodically. Seems like super-simple solution that has no real bad side-effects and can be implemented in 5 minutes rather than spending days of volunteers trying to understand what's the root cause (plus it could be coming from the deployment issue).

Pragmatically, seems like easy way to solve your problem at least for now if you are eager to get your mutliple clusters.

Another option might be to fund an effort of someone to help to analyse it maybe? @sa2413 - maybe your case/company would lile to sponsor some maintainers for example to investigate it deeply? We've never done anything like that - but if you are eager to get a solution, maybe that's the right way of solving it, I wonder?

harshg0910 commented 1 year ago

Hi @potiuk and @ephraimbuddy, I see "Airflow 2.7.3" milestone added to this bug, I was under the impression that it is being fixed in next bugfix release. Is it not the case (sorry, I'm not aware about the milestone convention we follow here)?

potiuk commented 1 year ago

Hi @potiuk and @ephraimbuddy, I see "Airflow 2.7.3" milestone added to this bug, I was under the impression that it is being fixed in next bugfix release. Is it not the case (sorry, I'm not aware about the milestone convention we follow here)?

Marking something as x.y.z means that we think it's important enough to be targetted to be looked at when we prepare the release. If a fix is ready, and issue closed and the fix gets cherry-picked to the release branch, it finally lands in the release and release notes.

But it's not guaranteed - and in this case we can move things to the follow-up release so that unsolved issues like this do not block the release - because maybe somoene will have some time to take a look at will focus on finding the problem. But since we have no idea for the reason yet, we can't fix it. It needs to be diagnosed deeply, hypotheses drawn and proven, fix implemented (and possibly tested by those who raised and observed it - usually we ask people who raise the issue to verify the hypothes we have - there are no "guarantees" such issues will be fixed - even if we think we found some reasons, it could be that it's note

Since this is an open-source project - there are no guarantees that all such issue will be fixed, nor even deeply looked at. Everyone contributing here do it on voluntary basis (including maintainer, PMC members and release managers) - even if some of us (like me) got some funds to keep them doing it full time, we are not paid to solve all issues and we do not provide SLAs for bugs being fixed (especially that the software is delivered for free without any guarantees).

And if they will have time and focus to fix things, they might (or might not) be able to fix those. Also anyone - including reporters and other contributors are welcome to attempt to diagnose and fix such issues. There are many issues that are solved by contributors here and it's actually one of the best way to give back to the community - by helping to diagnose and solve such issues or even pay someone to do so if they have a need to fix something to spend time on it - many of our contributors (including those fixing bugs) who are not even maintainers get paid for their contributions by somoene - their employees, companies who hire them etc.

BTW. Indeed I think it would be nice idea to explain it somewhere in more detail. currently we just have very short description about this in the release process https://github.com/apache/airflow/blob/main/dev/README_RELEASE_AIRFLOW.md#close-the-milestone

Before closing the milestone on Github, make sure that all PR marked for it are either part of the release (was cherry picked) or postponed to the next release, then close the milestone. Create the next one if it hasn't been already (it probably has been). Update the new milestone in the https://github.com/apache/airflow/issues/10176 make sure to update the last updated timestamp as well.

But likely we should be more clear what it means. I will propose a PR based on that answer.

potiuk commented 1 year ago

I prepared pr #35245 and started proposal on devlist to add description on the process/approach we are using. See https://lists.apache.org/thread/05njmmqvwl0gn20f2go9d420xhzptrw2 - feel free to chime in as well there - in PR or devlist @harshg0910

llamageddon83 commented 1 year ago

Hi all,

Thank you to everyone who provided input and to the Airflow team for following up on this. While going through some old posts from @potiuk on stackoverflow in response to the Airflow v1 question, he mentioned the AIRFLOW__SCHEDULER__NUM_RUNS configuration. It restarts the scheduler loop as per the value specified.

We have added this configuration to our Airflow platform and set it to '100000` which happens about every 2 hours. We are still performing validations since it usually takes 2-3 weeks for the slowness to creep up. But in case anyone else wants to give it a shot as well.

llamageddon83 commented 12 months ago

Hi all,

Configuring AIRFLOW__SCHEDULER__NUM_RUNS has fixed the problem for us.

Thank you

droppoint commented 11 months ago

Hi, everyone!

I think I found the root cause of the problem.

Short answer: The KubernetesExecutor._adopt_completed_pods function is not compatible with concurrently running schedulers.

Long answer: I encountered an issue described here after updating my Airflow instance from 2.4.3 to 2.7.3. After the update, the executor.open_slots metric started decreasing, as shown in the picture below.

Снимок экрана от 2023-11-21 10-04-04

After restarting the schedulers, the open_slots metric resets, but then it starts to decline again. After some investigation, I discovered two things:

  1. My KubernetesExecutor.running set has TaskInstances that were completed a while ago.
  2. These TaskInstances should not be in this scheduler because they were completed by my other scheduler.

After digging through the logs, I found that pods belonging to those task instances were adopted. The log entry "Attempting to adopt pod" (with a capital "A") corresponds to this line in the code.

Снимок экрана от 2023-11-20 17-40-21

The first error in the function occurs here. Even if the scheduler fails to adopt a pod, the pod is still added to the KubernetesExecutor.running set. This piece of code was changed in #28871 and was merged into 2.5.2

In my case, the pod failed to be adopted because it had already been deleted (error 404). I decided to fix it by simply adding continue in the except block. After the fix, the situation improved a lot, but I still saw some TaskInstances in the KubernetesExecutor.running set that didn't belong to that scheduler. Then I found the second problem with this function. The KubernetesExecutor._adopt_completed_pods function is called unconditionally in the KubernetesExecutor.try_adopt_task_instances function, which is called by SchedulerJobRunner. SchedulerJobRunner sends a list of TaskInstances that need to be adopted because their Job.last_healthcheck missed the timeout. KubernetesExecutor.try_adopt_task_instances iterates through that list and tries to adopt (patch one of the labels) pods that belong to these TaskInstances. If adoption is successful, it adds the TaskInstance to the KubernetesExecutor.running set. However, KubernetesExecutor._adopt_completed_pods, which is called during try_adopt_task_instances, does no such thing - it just gets the list of all completed pods and tries to adopt all completed pods that are not bound to the current scheduler. This results in the constant adoption of completed pods between schedulers. Scheduler 1 adopts completed pods of scheduler 2 and vice versa.

So, how can we fix this?

I think that the _adopt_completed_pods function needs to be removed because the presence of completed pods after scheduler failure is pretty harmless, and the airflow cleanup-pods CLI command can take care of that. Plus this function has already caused problems #26778 before. But I might be wrong. So if some maintainer can give advice on this situation, it would be great.

potiuk commented 11 months ago

I pinged at the #development channel of Airflow's Slack as we are now gearing up for 2.8.0 release that might be a good opportunity to have some maintainers to take a close look. Thanks a lot for the detailed analysis - I think it might be super -helpful in quick diagnosis and remedy for that issue. https://apache-airflow.slack.com/archives/CCPRP7943/p1700574306533159

droppoint commented 11 months ago

@potiuk Thank you. Should I submit a PR with the _adopt_completed_pods removal or is it better to wait for the maintainers decision on how to fix this problem?

potiuk commented 11 months ago

@potiuk Thank you. Should I submit a PR with the _adopt_completed_pods removal or is it better to wait for the maintainers decision on how to fix this problem?

Submitting a PR is ALWAYS good .. This way it might get people to the right code faster and their comments might be better - the closer the code and change you are, the better!

potiuk commented 11 months ago

Worst-case PR will be closed and another one will supersede it - and everyone will learn from the comments in the first one :)

github-actions[bot] commented 11 months ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

dirrao commented 11 months ago

Hi @llamageddon83, @droppoint, We are facing this similar issue more often. Here is the another ticket for the same #35675. I have created the MR for the same https://github.com/apache/airflow/pull/36240

droppoint commented 11 months ago

@dirrao This is cool, but I already submitted PR #35800 almost a month ago. I think we are fixing different problems. Your PR addresses the issue when adoption is performed on a live scheduler that just skipped the heartbeat. We encountered this problem, but we resolved it by simply increasing the scheduler_health_check_threshold. However, there is another problem in the adoption cycle. The "adoption" of completed pods is unconditional, so even if the scheduler didn't skip the heartbeat, another scheduler will try to adopt "completed" pods from it. This results in a bloated running set. For more information, you can read our analysis of the situation here (https://github.com/apache/airflow/issues/32928#issuecomment-1820413530). Please feel free to check out our PR.

P.S. Can you share the secret of how you managed to get a review from the maintainer so fast?

potiuk commented 11 months ago

P.S. Can you share the secret of how you managed to get a review from the maintainer so fast?

I can share a secret. Be kind, be considerate, but also ... be persistent. The thing is that maintainers have sometimes multiple tens of PRs to take a look at DAILY, and - more often than not - they do it in their free time, or things like MySQL breaking all images released during last 3 years and forces them to scramble to fix that... Or they have new release coming and they want to make sure it's good.

And your PR might be simply one of the many PRs that you forget that is important.

But on the other hand, you have one PR, important and useful and solving real problem and you are eager to get it merged. And you definitely remember about it and are a but disappointed it have not been reviewed so far. And well, you are the driving force there if you understand the assymetry.

So ... keep on reminding if you see no response back in a few days. It's far easier for you to see "nothing happened yet to that only PR in Airflow I care about", rather than maintainers haveing to "daily" look through 170+ opened PRs to be able to decide where to spend some of their free time they have to spare on Airflow to pick just this one.

I think that's the best recipe.

potiuk commented 11 months ago

I will ping in development again.

potiuk commented 11 months ago

Reopening as the #35800 seems to also address part of the issue.

ephraimbuddy commented 11 months ago

Did the PR actually fixed this issue? I think we need to benchmark it before closing. We have seen this issue with celery and the PR was on kubernetes

dirrao commented 11 months ago

I see multiple issues are reported in the same issue. We might need to create separate issue for Celery executor.

potiuk commented 11 months ago

Yeah. I think there could be many similar behaviours for various reasons. I think we should keep on getting info from people reporting similar issues after upgrading to latest version of Airflow after we merge and release some fixes to see if the issue is still occuring. Those kind of issues are difficult to reproduce and diagnose with certainty

dirrao commented 11 months ago

I have created a new issue #36335 for celery executor. Let's keep this for Kubernetes executor.

dirrao commented 10 months ago

@llamageddon83 We have faced a similar issue in production, identified the root cause, and provided the fix in #36240. If you are looking for an immediate fix, then you can try this patch #36240. As a workaround, you can increase the parallelism to a large number (1OK).

dirrao commented 10 months ago

Hi @llamageddon83, Kubernets executor slots leak fix is available in 7.12.0 version. You can try and see if this fix your issue. https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/changelog.html

ephraimbuddy commented 10 months ago

FYI, I updated [scheduler]max_tis_per_dag to 512 in 2.8.0 and saw good improvement in the number of tasks ran per minute

github-actions[bot] commented 9 months ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] commented 9 months ago

This issue has been closed because it has not received response from the issue author.

github-actions[bot] commented 3 weeks ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

potiuk commented 1 week ago

Airflow 2.10.3 is now out an it has fix #42932 that is likely to fix the problems you reported, please upgrade, check if it fixed your problem and report back @llamageddon83 ?