apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

Tasks stuck in queued state #21225

Closed Chris7 closed 1 year ago

Chris7 commented 2 years ago

Apache Airflow version

2.2.3 (latest released)

What happened

Tasks are stuck in the queued state and will not be scheduled for execution. In the logs, these tasks have a message of could not queue task <task details>, as they are currently in the queued or running sets in the executor.

What you expected to happen

Tasks run :)

How to reproduce

We have a dockerized airflow setup, using celery with a rabbit broker & postgres as the result db. When rolling out DAG updates, we redeploy most of the components (the workers, scheduler, webserver, and rabbit). We can have a few thousand Dagruns at a given time. This error seems to happen during a load spike when a deployment happens.

Looking at the code, this is what I believe is happening:

Starting from the initial debug message of could not queue task I found tasks are marked as running (but in the UI they still appear as queued): https://github.com/apache/airflow/blob/main/airflow/executors/base_executor.py#L85

Tracking through our logs, I see these tasks are recovered by the adoption code, and the state there is STARTED (https://github.com/apache/airflow/blob/main/airflow/executors/celery_executor.py#L540).

Following the state update code, I see this does not cause any state updates to occur in Airflow (https://github.com/apache/airflow/blob/main/airflow/executors/celery_executor.py#L465). Thus, if a task is marked as STARTED in the results db, but queued in the airflow task state, it will never be transferred out by the scheduler. However ,you can get these tasks to finally run by clicking the run button.

Operating System

Ubuntu 20.04.3 LTS

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

I believe this could be addressed by asking celery if a task is actually running in try_adopt_task_instances. There are obvious problems like timeouts, but celery inspect active|reserved can return a json output of running and reserved tasks to verify a STARTED task is actually running

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

dhananjays commented 2 years ago

We're facing this as well after upgrading to 2.2.3 from 2.1.0

dimon222 commented 2 years ago

Plus one here, noticing this frequently after bump from 2.1.4 to 2.2.3

Chris7 commented 2 years ago

This is not ideal, but for those who want to kick all their queued tasks to running, here's a snippet i've been using:

from airflow import models, settings
from airflow.executors.executor_loader import ExecutorLoader

session = settings.Session()
tis = session.query(models.TaskInstance).filter(models.TaskInstance.state=='queued')
dagbag = models.DagBag()
for ti in tis:
  dag = dagbag.get_dag(ti.dag_id)
  task = dag.get_task(ti.task_id)
  ti.refresh_from_task(task)
  executor = ExecutorLoader.get_default_executor()
  executor.job_id = "manual"
  executor.start()
  executor.queue_task_instance(ti, ignore_all_deps=False, ignore_task_deps=False, ignore_ti_state=False)
  executor.heartbeat()
potiuk commented 2 years ago

I think this is being handled in one of the fixes coming to 2.2.4 (@ephraimbuddy ?)

easontm commented 2 years ago

I'm getting the same message but different symptoms on a similar setup. CeleryKubernetesExecutor, RabbitMQ, and MySQL backend. For me, the task gets marked failed with no logs.

A sensor in reschedule mode was doing it's periodic run-sleep-run pattern. On one of the later runs, I got the could not queue task instance message. I grabbed the external_id from the logs and searched for it in Flower, and it turns out the celery task actually succeeded (that is, performed its check -- the sensor itself didn't "succeed"). However, the Airflow state was still recorded as failed.

I think some kind of desync between the states known by Airflow and the executor is causing this, but I'm not sure why the results of the successfully executed task weren't reported, even though Airflow thought it couldn't be queued (which it was).

ephraimbuddy commented 2 years ago

I think this is being handled in one of the fixes coming to 2.2.4 (@ephraimbuddy ?)

Yes. Here https://github.com/apache/airflow/pull/19769

Chris7 commented 2 years ago

Thanks @ephraimbuddy ,

Looking at the PR, I'm not sure if it will address this problem. Notably, tasks are added to the running set by the abandoned task code, which means this line will skip over all these forever queued tasks: https://github.com/apache/airflow/pull/19769/files#diff-ac6d6f745ae19450e4bfbd1087d865b5784294354c885136b97df437460d5f10R417

arkadiusz-bach commented 2 years ago

I had similar problem, it was happening, because KubernetesExecutor is picking CeleryExecutor tasks on CeleryKubernetesExecutor

Celery is changing task state to queued and KubernetesExecutor to scheduled, it is happening over and over again(depends how fast task gets to running state)

I fixed it by adding additional filter on task queue(which defaults to 'kubernetes' ) for KubernetesExecutor in couple of places, below is probably the one that is causing most of the problems: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/kubernetes_executor.py#L455

It is taking all of the tasks in queued state, but it should take only those that should run on Kubernetes - has queue equaled to: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/config_templates/default_airflow.cfg#L743

That is why there is log entry: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/base_executor.py#L85

Because celery gets the same task that is already inside queued_tasks

If this is the case then you will see following messages in the logs as well: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/kubernetes_executor.py#L494

meetri commented 2 years ago

i'm still having this issue on 2.2.5 any updates on this ticket or when it may get fixed. In the mean time i'm using the code snippet provided by @Chris7

potiuk commented 2 years ago

i'm still having this issue on 2.2.5 any updates on this ticket or when it may get fixed. In the mean time i'm using the code snippet provided by @Chris7

Some parts of this issue is marked as fixed. You likely experience other issue. But we will never know unless you open a new issue and provide all the details, your logs and circumstances that you experience the problem. Unless you do that - there is no chance your issue will be handeven looked at (and even less likely fixed, maybe accidentally) - because no-one has any idea about Your issue details.

potiuk commented 2 years ago

Just be aware @meetri that you get the software for free - and a lot of the people contribute to it for free. And the best value you can bring is to spend some time on detailing your problems. providing as much as you can - including the circumstances, details and reproducibility. This way the smallest help you can provide for those who try to help people like you - on a free forum, for - again - software that you paid precisely 0 USD for.

vandanthaker commented 2 years ago

https://github.com/apache/airflow/issues/21225#issuecomment-1035656930 @arkadiusz-bach - any workaround that helped? I tried changing queue name from default kubernetes to kubernetesQ like below but didn't help

[celery_kubernetes_executor]
kubernetes_queue = kubernetesQ

But I still see K8S executor picking up tasks from default queue(celery) which are in queued state

[2022-04-18 11:10:05,800] {kubernetes_executor.py:454} INFO - Found 2 queued task instances
[2022-04-18 11:10:05,823] {kubernetes_executor.py:491} INFO - TaskInstance: <TaskInstance: dbr_concurrent_trigger_poll.gcorp_ccl.trigger_descision_for_source scheduled__2022-04-18T11:05:00+00:00 [queued]> found in queued state but was not launched, rescheduling

I see a PR https://github.com/apache/airflow/pull/23048 to permanently fix this but it might take sometime to come w/ next stable AF release

arkadiusz-bach commented 2 years ago
  1. Download zip with Airflow version you have from pypi

  2. Check in which section inside config kubernetes_queue is defined for your Airflow version. I think that it was moved from kubernetes section to celery_kubernetes_executor, but im not sure, just check that :)

  3. In kubernetes_executor.py:

    • At the beginning import conf and SQLAlchemy 'and_' operator:
      from airflow.configuration import conf
      from sqlalchemy import and_
    • Inside __init__ of KubernetesExecutor class:
      self.is_celery_kubernetes_executor = conf.get('core', 'executor') == 'CeleryKubernetesExecutor': 
      self.kubernetes_queue = conf.get('celery_kubernetes_executor', 'kubernetes_queue')
  4. Replace kubernetes_executor.py file with patched one - it will be somewhere inside site-packages directory under python directories

PS: I just wrote that in notepad, so please check whether there are no syntax errors

vandanthaker commented 2 years ago

Got it! see similar fix in PR too. so no workaround w/o code patching, which is also evident as the query(inside k8s executor) on task instances is looking only queued state and not queue(celery/k8s). Thanks

meetri commented 2 years ago

Just be aware @meetri that you get the software for free - and a lot of the people contribute to it for free. And the best value you can bring is to spend some time on detailing your problems. providing as much as you can - including the circumstances, details and reproducibility. This way the smallest help you can provide for those who try to help people like you - on a free forum, for - again - software that you paid precisely 0 USD for.

Understood, and thanks you to everyone that contributes to this project. If needed I can provide whatever information to track down this issue. I posted my comment only because I thought it would be of use for anyone passing through that even the latest version of the code has this issue and how I got around it for the time being.

This may or may not be of help but I did since my post tune things to such an extent to where I didn't need to run the script posted by Chris7, https://github.com/apache/airflow/issues/21225#issuecomment-1029060830

Firstly, I'm running this in amazon in an EKS cluster.

basically, the biggest issue was I didn't have enough memory allocated to the scheduler by a lot. I bumped it up to about 8GB minimum. This stopped a lot of the errors related to the scheduler missing heartbeats.

Secondly, i upped the number of cores at minimum to 2 per container. I am running 2 airflow scheduler replicas.

Lastly, I'm not sure how big of an impact this was probably pretty major. But given my pods autoscale up and down on demand based on a redis queue. I believe that pods were getting scaled down and the workers were being force terminated. Once i added a dumb-init to the airflow worker docker image that problem seemed to go away.

I'm only posting this if anyone is having similar issues. This may or may not be a bug in the airflow.

nuclear0wl commented 2 years ago

hello @meetri, could you please provide more info about you modification to Docker image? I also noticed that I'm getting "lost" Queued tasks when turning on Redis-based autoscaling but can't understand how some kind of dumb-init can help to avoid an issue in this situation.

eladkal commented 2 years ago

Is anyone experienced this issue after 2.3.1? I would imagine that https://github.com/apache/airflow/pull/23690 should make things better... even if not fixing the issue but making sure that tasks don't get stuck in queue state.

patricker commented 2 years ago

We have been sing tasks stuck in a queued state on Airflow 2.3.2. Not many of them, but a few just get stuck in Queued and I have to go either clear them or run them.

potiuk commented 2 years ago

@patricker - can you please update to 2.3.4 and see it solves your problem? You will have to do it sooner or later (and it has dozens of fixes) and might be fastest way for you (and us) to fix your problem.

V0lantis commented 2 years ago

Hello there 👋

We experienced the same issue in our deployment cluster, and after a lot of searching, I think I have found why it is happening as some reproductible steps to provoke it. I am very willing to submit a PR if I may.

First off, we experienced this issue very rarely on our deployment cluster. Our deployment cluster is right now on AWS EKS, Airflow version 2.3.3, with redis message broker and CeleryExecutor of course :). After a lot of searching, I found out that the queued task (or the queued tasks, I don't know how many we had because we have 1000+ dags and even more tasks), which didn't want to get executed appeared when our AWS autoscaler was doing a downscaling of our pod. At that point, when I looked at the two pods celery workers which were downscaled, I saw a a log message in the pod :

worker: Warm shutdown

this log message appeared a couple of milliseconds after the scheduler has sent the task to redis. What if, the worker had consumed this message, but exactly at the same time, celery had shutdowned the consumer loop, and the message never get properly executed. BUT, since the message has been consumed, (LPOP), redis didn't have the message in its queue, but the Celery Executor still had it ! That's why we have the log (after recleaning the task):

[2022-08-27 21:15:40,100] {base_executor.py:93} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', 'manual__2022-08-27T19:13:04.614822+00:00', '--local', '--subdir', '/Users/arthurvolant/dev/python/airflow/airflow/example_dags/example_bash_operator.py']
[2022-08-27 21:15:40,101] {base_executor.py:213} INFO - task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) is still running
[2022-08-27 21:15:40,205] {base_executor.py:213} INFO - task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) is still running
[2022-08-27 21:15:41,292] {base_executor.py:213} INFO - task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) is still running
[2022-08-27 21:15:42,391] {base_executor.py:213} INFO - task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) is still running
[2022-08-27 21:15:43,498] {base_executor.py:217} ERROR - could not queue task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) (still running after 4 attempts)

I guess, that's why there is a visibility_timeout, but unfortunately, ours is 25hours and some of our tasks cannot suffer any delay (maximum half an our). Anyway, here are the steps to reproduce it :

  1. Start a basic cluster / docker dev env (those steps still work on the latest commit, Airflow 2.3.4.dev) breeze start-airflow -p 3.10 -b postgres --integration redis --db-reset
  2. Start a dag (In my example, I am starting _example_bashoperator.
  3. Tasks will be scheduled and put into queue by the CeleryExecutor, but not executed since not worker is running
  4. Connect to redis and LPOP one or more messages : docker exec -it docker-compose_redis_1 redis-cli and LPOP default
  5. Activate the celeryWorker.
  6. Boom 💥 . All the tasks which are still in queued are run, except the one(s) you popped out of the queue.

I will try to think of something. I have some ideas and submit a PR as soon as possible since this is very stressful for our deployment.

And here is a picture: Capture d’écran 2022-08-27 à 21 15 21

christophedcpm commented 2 years ago

I'm using airflow 2.2.5 in cloud-composer 2.0.24 on GCP and I have noticed the same kind of error. My investigation also lead me to think that the issue happens when the autoscaler stop worker pods.

arkadiusz-bach commented 2 years ago

@V0lantis task might stuck in the queued state at least due to:

This message worker: Warm shutdown means that celery received SIGTERM signal and it started gracefull shutdown - it is not going to pick any more tasks from redis queue and it will wait for all of the running tasks to finish

But if you've got some tasks that may be running for longer than terminationGracePeriod then Kubernetes might send SIGKILL first and:

Also some of the celery workers might receive SIGKILL signal, when there is not enough memory allocated and it may led to the same behaviour, unfortunately you may not see OOM events in the kubernetes cluster when it happens, becaue when there is more than one process running on the container in Kubernetes then it is chosing randomly one of the child processes within container and sends SIGKILL(Celery is running with Main process and child processes(workers)).

If the Main process receives SIGKILL you will probably see OOM event(container will restart/stop), but if child then tasks it was processing will fail(in the logs you will be able to see that it received SIGKILL singal) or stuck in queued state if it was able to pick it and container will be running fine, - one of the celery processes will be restarted

V0lantis commented 2 years ago

Hey @arkadiusz-bach,

I am sorry, that's true that I haven't posted any logs, maybe I haven't been clear as well...

@V0lantis task might stuck in the queued state at least due to:

  • Redis crashed after receiving the task and it had no time / was not configured to save its state to disk, so the task was lost, but scheduler thinks that task is waiting to be picked up workers

We use ElasticCache service. The service is configured to store data on disk and is oversized for the actual amount of tasks it received and as stated above, celery actually acknowledged when tasks are successfully sent to queue; otherwise, it fails.

  • terminationGracePeriodSeconds on your worker PODs is too low or it is not there at all.(default is 60 seconds)

The warm shutdown was received 10 minutes before the pod was actually killed. But the task was sent and acknowledged by CeleryExecutor with:

 Setting external_id for <TaskInstance: example_bash_operator.this_will_skip scheduled__2022-08-28T00:00:00+00:00 [queued]> to 979a100a-5012-4135-b01f-b93577b39ede

in the scheduler_log, which actually means that the task was received by redis (or celery would through a connection error).

This message worker: Warm shutdown means that celery received SIGTERM signal and it started gracefull shutdown - it is not going to pick any more tasks from redis queue and it will wait for all of the running tasks to finish

Yep, but as I said in my comment :

this log message (worker: Warm shutdown) appeared a couple of milliseconds AFTER the scheduler has sent the task to redis.

⬆️ I still think that Celery consumer exited during the handling of my message here and didn't have time to put it in the unacked hash, which stores unacknowledged tasks.

But if you've got some tasks that may be running for longer than terminationGracePeriod then Kubernetes might send SIGKILL first and:

  • Celery will not be able to wait for all of the running tasks to finish(those will end with failed status)
  • it was able to pick the task from queue(before SIGTERM), but not able to change its state to running(After SIGKILL) - maybe your case

⬆️ I stil think that my pods received SIGKILL 10 minutes after the SIGTERM but I haven't been able yet to check the terminationGracePeriod

Also some of the celery workers might receive SIGKILL signal, when there is not enough memory allocated and it may led to the same behaviour, unfortunately you may not see OOM events in the kubernetes cluster when it happens, becaue when there is more than one process running on the container in Kubernetes then it is chosing randomly one of the child processes within container and sends SIGKILL(Celery is running with Main process and child processes(workers)).

Already checked as well. We have a Grafana dash to monitor the hardware resources that the pod is consuming, and the workers were far below the threshold we gave them

If the Main process receives SIGKILL you will probably see OOM event(container will restart/stop), but if child then tasks it was processing will fail(in the logs you will be able to see that it received SIGKILL singal) or stuck in queued state if it was able to pick it and container will be running fine, - one of the celery processes will be restarted

The thing is, the task never started. Thanks anyway for your input, it was really enriching :)

V0lantis commented 2 years ago

I have been trying to fix this issue in celery/kombu where I beleive(d) was lying the issue we are talking about here (namely, that a SIGTERM was sent while the redis queue was consumed, leading to a message never being acknowledged and a message lost in the wild).

The issue is, I have been trying to replicate the issue in kombu without any success. I managed to reproduce the issue directly in Airflow by RPOP a msg before calling celery worker and it worked perfectly as described at the beginning of this issue. But, at least in its last version (Celery v.5.2.7) which is the one we have in production, I can't reproduce the behavior (Sending a SIGTERM just after kombu has BRPOP the msg). The msg is rightfully acked, and rightfully put back in the queue after SIGTERM had been catched by python.

I don't really know where to go from here now.

I am putting the logs msg we have been having in our prod environment. Maybe someone else will have a better idea :

Scheduler logs

``` [2022-08-24 16:23:50,060] {cheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='dag_id', task_id='task_id', run_id='scheduled__2022-08-24T15:00:00+00:00', try_number=1, map_index=-1) to executor with priority 5 and queue default [2022-08-24 16:23:50,060] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dag_id', 'task_id', 'scheduled__2022-08-24T15:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/dag_id.py'] [2022-08-24 16:23:50,264] {cheduler_job.py:605} INFO - Executor reports execution of dag_id.task_id run_id=scheduled__2022-08-24T15:00:00+00:00 exited with status queued for try_number 1 [2022-08-24 16:23:50,276] {cheduler_job.py:632} INFO - Setting external_id for to 139960c0-263d-4891-b1b0-712b077a0d2b ```

There were two workers which were downscaled with similar logs:

1rst worker

``` [2022-08-24 16:23:49,597: WARNING/ForkPoolWorker-254] unknown id type, added to user and store ids columns: create_account_step.fingerprint worker: Warm shutdown (MainProcess) [2022-08-24 16:23:49,777: WARNING/ForkPoolWorker-254] unknown id type, added to user and store ids columns: login.fingerprint ```

The log above is not relevant. The relevant information here is that the worker received shutdown SIGTERM signal around 0.3 seconds before the task id was actually sent.

potiuk commented 2 years ago

I think a really good way to start wil be to upgrade Airflow to 2.4.0 once it is out (even today possibly?) and install it with the right constraints (so that the right celery /kombu packages get installed). That sounds like it could have been handled already in the meantime by some of the recent Airflow fixes or - more likely - recent celery/kombu updates.

I think if this is confirmed for 2.4.0, we will have in general more capacity to look more closely to such issues and see if this can be reproduced and fixed.

V0lantis commented 2 years ago

We will try this @potiuk, thank you !

potiuk commented 2 years ago

It's been released BTW :)

hterik commented 2 years ago

We got similar issue (Airflow 2.3.3), it happened right after worker suffered connection loss to Redis who later did not consume any tasks for 3 days, despite showing up as healthy in Flower. One thing i suspect is that reconnect attempt got stuck in an infinite timeout. Python sockets have default no timeout, same default applies in Celery and redis-py. (This would not explain why flower showed node as healthy though)

Discussed in https://github.com/redis/redis-py/issues/722 and https://github.com/redis/redis-py/issues/2243

In airflow, the celery_broker_transport_options option can be used to set the options discussed there.

nicnguyen3103 commented 2 years ago

Hi we have a similar issue on Airflow 2.4.0. The issue happens after I set the pool too high, which leads to locking issue on Databases (the exact error was "Lock wait timeout exceeded; try restarting transaction"). The task was unable to mark failed for a while and after they did, the pool was kinda corrupted, which leads to the task being stuck in the queue (the last pool). The other pool is running fine, task is still running and scheduled as normal

I do not have any direction to troubleshoot this issue at the moment, but if someone could give me pointers on where to check, I am glad to help troubleshoot the issue

image
Imcodingcc commented 1 year ago

https://airflow.apache.org/docs/apache-airflow/stable/howto/dynamic-dag-generation.html#optimizing-dag-parsing-delays-during-execution

Imcodingcc commented 1 year ago

https://airflow.apache.org/docs/apache-airflow/stable/howto/dynamic-dag-generation.html#optimizing-dag-parsing-delays-during-execution in my case, I use kubernetes executor and dynamic DAG, I found it is stuck in DAG import in each task, I followed the example suggested by the official documentation, skipped other DAG scans, and finally solved the problem

fbmatilla commented 1 year ago

Hi, we are facing this issue in production too. Hoping to have the fix too. Thanks.

Walkerazos commented 1 year ago

hi, we encountered this problem in production very fresh. I hope you find a solution to the problem. It's giving us a lot of trouble. Thank you.

patricker commented 1 year ago

I don't think we've seen this since upgrading to 2.4.1.

potiuk commented 1 year ago

hi, we encountered this problem in production very fresh. I hope you find a solution to the problem. It's giving us a lot of trouble. Thank you.

You have not explained which version you have. Did you upgrade to latest 2.4 version @Walkerazos ? This is anyhow the only way how an fix can be applied (by upgrading to newer version) so you will have to do it anyway. And if you upgrade to 2.4 and you stop seing the problem, please report it here - we alredy got a few reports that the error has been solved since (there were multiple hundreds of errors fixed since and some of them related, so this is absolutely easiest and best way to verify if that problem has been fixed - by having several users like you who have this problem and it is gone after the upgrade.

Can you help us with that @Walkerazos ?

rodrigoassumpcao commented 1 year ago

Hi guys. I have the same issue. I'm using KubernetesExecutor (on Azure), airflow version 2.4.2 (helm chart 1.7.0), kubernetes 1.24.6

Anyone knows a workaround for KubernetesExecutor?

This issue is causing a lot of troubles

potiuk commented 1 year ago

Hi guys. I have the same issue. I'm using KubernetesExecutor (on Azure), airflow version 2.4.2 (helm chart 1.7.0), kubernetes 1.24.6

Anyone knows a workaround for KubernetesExecutor?

This issue is causing a lot of troubles

Then open a new issue and provice as much of an input as possible, showing your logs and circumstances (which might be very differnet from this case despite similarlities). This is the only way you can help with diagnosing your issue (and it is all in your hands to provide as much useful information as possible @rodrigoassumpcao )

fbmatilla commented 1 year ago

:( Looks like this is surfing from one milestone to the next one....

eladkal commented 1 year ago

milestone on issues doesn't guarantee solution. Most of the times it's just being bumped automatically when we release new version and issue was not fixed. I'll remove it from this issue.

potiuk commented 1 year ago

And more oten than not, the reasons for observed behaaviour MIGHT be different in different versions. The only way to know if your issue (Which might seem siimilar) is actually the same as other's issue and to help with investigations of it is you spending time on documenting it and providing evidences. You and your company paid exactly 0 USD for the software and helping community to investigate your problem is the LEAST you can do to contribute back for the absolutely free software you got, that many people developed in their free time.

Not understanding this and complaining that you need to spend some time on documenting your issue (and putting thumbs down on comments of those people whe actually spend a ton of their free time on making the software avaialble for you for free shows lack of respect and empathy on your side @fbmatilla

fbmatilla commented 1 year ago

I was just reading all the thread an seeing this particular issue is not being very well managed. That's all. We can live without a fix, for sure. Thank you for all your time.

potiuk commented 1 year ago

I was just reading all the thread an seeing this particular issue is not being very well managed. That's all. We can live without a fix, for sure. Thank you for all your time.

No. you were also downvoting comments without understanding the context of the project - which is (again let me repeat it) lack of respect and empathy.

fbmatilla commented 1 year ago

Yes, that's because I saw comments confirming this was solved but at the same time I could see the issue open and going from one milestone to the next one. I remove my downvoting if that's a problem....

potiuk commented 1 year ago

Apologies for doing that would be more appropriate - because you were downvoting the comments that explained why this was happening in response to your concern.

fbmatilla commented 1 year ago

Sure, apologies to all those working for free in this great and wonderful project! And just remarking what I didn't like on this particular thread. Thanks

dharmendrakariya commented 1 year ago

Hi guys. I have the same issue. I'm using KubernetesExecutor (on Azure), airflow version 2.4.2 (helm chart 1.7.0), kubernetes 1.24.6

Anyone knows a workaround for KubernetesExecutor?

This issue is causing a lot of troubles

did you figure out bro? I am stuck with this!

chenzuoli commented 1 year ago

This is not ideal, but for those who want to kick all their queued tasks to running, here's a snippet i've been using:

from airflow import models, settings
from airflow.executors.executor_loader import ExecutorLoader

session = settings.Session()
tis = session.query(models.TaskInstance).filter(models.TaskInstance.state=='queued')
dagbag = models.DagBag()
for ti in tis:
  dag = dagbag.get_dag(ti.dag_id)
  task = dag.get_task(ti.task_id)
  ti.refresh_from_task(task)
  executor = ExecutorLoader.get_default_executor()
  executor.job_id = "manual"
  executor.start()
  executor.queue_task_instance(ti, ignore_all_deps=False, ignore_task_deps=False, ignore_ti_state=False)
  executor.heartbeat()

hello @Chris7 , I was wondering where do you place the code to run, add to a python script in the loop?

funes79 commented 1 year ago

@chenzuoli @Chris7 this code works but in my case the job_id needs to be an integer, so something like this worked for me:

...
  executor = ExecutorLoader.get_default_executor()
  executor.job_id = 1234567890
  executor.start()
...

Issue on 2.6.3 with posgre db backend, hundreds of tasks running in the same time in Kubernetes. No Celery. Usually it happens during those rush periods.

gurmeetsaran commented 10 months ago

Thanks @chenzuoli for providing the script. We use Airflow 2.2.2 and initially use the cronjob to run the provided script automatically to run task which were stuck. The script works great for non sensor related task but for sensor(specially reschedule mode), it becomes responsibility of script to manage the task for every single run. In a way the script becomes mini scheduler. This was becoming unmanageable so we decided to port the fix to airflow 2.2.2

The patch is available https://github.com/gurmeetsaran/airflow/pull/1.patch for airflow 2.2.2 (only celery executor). We are not seeing task stuck in queued state issue after applying this patch.

Sharing it here if it helps anyone.