apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.62k stars 14.18k forks source link

Tasks being marked as failed even after running successfully #34339

Closed argibbs closed 1 year ago

argibbs commented 1 year ago

Apache Airflow version

2.7.1

What happened

When running a dag, a task's logs will show that it ran successfully, and completed without error, but the task is marked as failed.

See (slightly redacted, sorry!) image an for example:

image

As you can see, the log ends with the messages:

[2023-09-13, 12:26:30 BST] {subprocess.py:97} INFO - Command exited with return code 0
[2023-09-13, 12:26:31 BST] {taskinstance.py:1398} INFO - Marking task as SUCCESS. dag_id=STOXX600_Index_1.ANC_110.AEE_110.QTR1.Realtime, task_id=Shared-Data.split-adjustments-combined, execution_date=20230913T090500, start_date=20230913T112612, end_date=20230913T112631
[2023-09-13, 12:26:31 BST] {local_task_job_runner.py:228} INFO - Task exited with return code 0

And yet the task has been marked as failed. 😱

Even more "interesting" (aka worrying), it's been marked as failed on it's first attempt - and yet, the task is configured with retries=2. The retry attempts setting has been ignored. image

My version history has been:

  1. 2.3.3
  2. 2.7.0
  3. 2.7.1

(I was naughty and did not keep up with releases). This issue is very definitely present on 2.7.0 and 2.7.1; I never saw it in 2.3.3

What you think should happen instead

I mean, stating the obvious, but:

  1. The task should not have been marked as failed.
  2. Even if it had been seen as failed, it should have retried.

How to reproduce

I have no idea (yet) how to reproduce this problem outside of our existing environments.

Things I am looking at / have ruled out:

  1. The problem exists in both our dev and uat envs. I have not upgraded prod, for obvious reasons.
  2. Memory - the workers and schedulers are given access to the full memory of the box; it's shared with other processes, but even the smallest box has 5 Gb free at all times (including during the failure window)
  3. We currently run 3 schedulers; I am trying to run with a single scheduler to see if that improves the problem (since I am wondering if it's because the schedulers are somehow conflicting).

Operating System

centos 7 running a docker image of 2.7.1

Versions of Apache Airflow Providers

These should be consistent with the constraints set by 2.7.1

apache-airflow-providers-celery==3.3.3
apache-airflow-providers-common-sql==1.7.1
apache-airflow-providers-docker==3.7.4
apache-airflow-providers-ftp==3.5.1
apache-airflow-providers-http==4.5.1
apache-airflow-providers-imap==3.3.1
apache-airflow-providers-microsoft-mssql==3.4.2
apache-airflow-providers-odbc==4.0.0
apache-airflow-providers-postgres==5.6.0
apache-airflow-providers-redis==3.3.1
apache-airflow-providers-slack==8.0.0
apache-airflow-providers-sqlite==3.4.3

Deployment

Docker-Compose

Deployment details

We build the airflow image ourselves (because we've been building it since 1.10, and it's less hassle to bump the version number than trying to switch to the official image).

It's using python 3.10 as a base.

Our deployment runs postgres 13.1 + bundled celery + redis 6.0.9 + workers on-prem. We have historically run 3 HA schedulers, but as noted I'm trying to run with a single scheduler to see if that improves performance.

The other (possibly related change) is that in 2.3.3 we had max_tis_per_query = 512 and in 2.7.x we are using max_tis_per_query = 16 as per the 2.7.0 release notes.

We also have the following config overrides which are unchanged since 2.3.3:

      - AIRFLOW__CORE__PARALLELISM=1000
      - AIRFLOW__SCHEDULER__PARSING_PROCESSES=3
      - AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=120

Anything else

Our install has ~200 dags. Most dags are small, with only a handful of dags with >10 tasks per run.

The problem seems to happen fairly frequently (we run ~6 similar-looking dags 4 times a day, I see failures in multiple dags.

The behaviour sounds quite similar to #33155. I have also noticed tasks which don't appear to have any logs, but I can trace the task in flower (celery) and I can see that the worker ran the task successfully; it's even marked as success in flower. There's just no logs in the airflow gui, and the task has been marked as failed. e.g. here are some logs from the worker for a task that is marked as failed in the GUI, with no log file, but clearly ran ok.

[2023-09-13 09:40:12,903: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[723d73ae-323f-421b-8350-44a0c08c82d3] received
[2023-09-13 09:40:12,981: INFO/ForkPoolWorker-7] [723d73ae-323f-421b-8350-44a0c08c82d3] Executing command in Celery: ['airflow', 'tasks', 'run', '__redacted_dag_name__', 'Notebooks.notebook', 'scheduled__2023-09-13T07:05:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/pipeline_estimates.py']
[2023-09-13 09:41:45,673: INFO/ForkPoolWorker-7] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[723d73ae-323f-421b-8350-44a0c08c82d3] succeeded in 92.75607865909114s: None

The upgrade to 2.7.1 was performed in isolation. Our dev and UAT envs are failing consistently, our prod env (which is still on 2.3.3) is running almost the exact same set of dags, and has no errors.

Are you willing to submit PR?

Code of Conduct

Taragolis commented 1 year ago

Just wondering, are you execute Prefect flows/tasks inside of bash operator which run in celery worker?

argibbs commented 1 year ago

As is always the way: after sitting on this problem for days before raising the issue, I have just noticed that we are getting occasional timeouts in the dag processor manager on the dags that are most frequently exhibiting the problem. (Why we're hitting the timeout is a separate problem, but baby steps)

After dropping down to a single scheduler, this was manifesting as the dags dropping out of the gui then reappearing, which is how I discovered it. (Aside: given how critical the dag processor manager's core loop is to airflow reliability, I feel like it gets nowhere near as much error reporting as it should do. Really, the GUI should be flagging up process timeouts).

When running with multiple schedulers, we never noticed this flickering in and out of existence in the GUI. Total guess, but maybe this was because there was always at least one of the schedulers which had recently processed the dag ok... 🤷

My working hypothesis is now that a scheduler would timeout processing the dags, and this would somehow cause all the active tasks in the affected dags to be blatted as failed. (Insert suitable jazz hand waving over the specifics). I checked a few of the failures I've seen, and I do see timeouts in the processor at roughly the same time that the gantt shows the tasks being blatted as failed, despite actually running ok.

Anyhoo, I am now running two experiments:

  1. Multiple schedulers + increased timeout.
  2. Single scheduler + increased timeout.

Note: Obviously, the third experiment is:

  1. Single scheduler + default timeout.

I have been running this (a single scheduler + default timeout) for several days in one env, and the problem seemed to have gone away (which is why I was suspicious of multiple schedulers), but I have just checked the dag processor logs for that env, and it simply seems to have not been experiencing timeouts, so I guess it's possible I simply picked a less contested box for that sole scheduler. Or maybe timeouts and single scheduler is fine, and it's timeouts + multiple schedulers that's the problem. Or maybe I'm chasing a red herring.

I'll update if I find more.

argibbs commented 1 year ago

Just wondering, are you execute Prefect flows/tasks inside of bash operator which run in celery worker?

LOL, I wondered if anyone would pick up on that. Bonus internet points to you, sir.

I won't bore you with the minutiae (in this chain at least), but yes, we're using prefect in a limited fashion. The bash operator spins up a docker container, which contains code that builds a graph of tasks (which are quite fine-grained, think along the lines of dask, they're not something you'd want to try to create as an airflow dag) and then sends that to prefect. We don't use prefect's workers against pre-existing deployments, the graph is created dynamically and executed locally in the docker container. We just mostly use prefect for its pretty gui, and other out-of-the-box functionality.

FWIW, I think prefect and airflow are miles apart in terms of functionality and target audience. (Yes, I know other people will disagree, but let's keep this issue focused on airflow ... he says, trying to avoid a lengthy off-topic discussion)

Taragolis commented 1 year ago

I just thinking is it possible that combination of Airflow + Prefect + Celery could breaks occasionally something in multiprocessing if run in the same process.

My initial point not about starts "Holy War" lets keep it for reddit.

potiuk commented 1 year ago

That story is better than a horror movie :)

hussein-awala commented 1 year ago

For your last guess, where do you think the timeout is?

jscheffl commented 1 year ago

++ some guess on my side - we recently had such issue also related to infrastructure instability. Logs showed that tasks executed successfully but updates in DB failed due to network connection problems. We could find exceptions in connectivity in the worker stdout. So question:

argibbs commented 1 year ago

For your last guess, where do you think the timeout is?

  • is it in the dag processing job, and because of this timeout some other jobs (mainly the executor job) are killed?
  • or it's in the executor's job that has to load the dagbag from the dag file to get the dag, and it fails when the processing time exceeds the timeout?

The error is very explicitly coming from here: https://github.com/apache/airflow/blob/main/airflow/dag_processing/manager.py#L1215-L1219 and is inside the dag processing job. i.e. in dag_processor_manager.log.

We have actually already set AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT quite high (not like hours, but multiple minutes)... so the latter is very definitely not failing. The fix I'm attempting is similarly crude, but just to increase AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT to several minutes.

argibbs commented 1 year ago

++ some guess on my side - we recently had such issue also related to infrastructure instability. Logs showed that tasks executed successfully but updates in DB failed due to network connection problems. We could find exceptions in connectivity in the worker stdout. So question:

  • Is it possible for you to reproduce this and capture the worker logs for the time?
  • Can you share these logs and potentially the scheduler logs in the same timeframe?

I mean, I have been able to reliably reproduce this since upgrading to 2.7.0 - however, I have scoured the worker logs, and there's never any obvious errors. The same goes for the db logs as well. I haven't seen anything in the scheduler logs, but they're noisy. The only obvious error I've found so far is the dag processor timeout errors.

Also, worth noting that I really only changed one thing: upgraded from 2.3.3->2.7.0->2.7.1; if we'd been experiencing network issues, I'd have expected that to be version agnostic, rather than manifesting only once I'd upgraded to 2.7. I'm not discounting it (I'm working on removing some config errors that are causing log errors in the scheduler, so I can better grep for fails), but it's not my primary suspect right now.

argibbs commented 1 year ago

I just thinking is it possible that combination of Airflow + Prefect + Celery could breaks occasionally something in multiprocessing if run in the same process.

Ah, yeah, in that case, then no - it's run inside a docker container, so effectively completely independent from the airflow process.

Taragolis commented 1 year ago

Ah, yeah, in that case, then no - it's run inside a docker container, so effectively completely independent from the airflow process.

About timeouts, it seems a bit suspicious when 200 dags can't be processed in reasonable time, so I guess this some dynamic DAGs, which created from single python file.

The might be a good reason why some bad things happen, just because Airflow could decide that DAG might be deleted. I'm not sure in this case it is a bug or it's "by design". Some links which might improve your DAG parsing time

Just out of my curiosity The flow something like: AIrflow -> CeleryExecutor -> BashOperator -> Docker -> Prefect ?
jscheffl commented 1 year ago

Oh, did not catch the error details above that the DAG processor ran into timeout.

We are also hitting this issue and we are also looking for the root cause. Times in our case vary very much and the problematic DAGs have a very high volume of ~50.000 items in the queue.

So if the DAG parser runs into a timeout, you can check the stdout of this POD to see which DAG parses in which time as stats. This might give a hint if there is any specific DAG having a bad performance. If DAGs are not parsed because of timeout, they are removed after some grace time and then added later again. In the case a task executes during the DAG getting removed for moment, status can not be reported back (because DAG is gone) and thus the task will end up failed.

Taragolis commented 1 year ago

We build the airflow image ourselves

I hope it is not based on old puckel/airflow image or at least run entrypoint through tiny or dumb-init

Building the image, it also might be useful, at least it cover how to extend or customise official image

argibbs commented 1 year ago

I hope it is not based on old puckel/airflow image

"But doctor", cried the clown, "I am puckel!"

Just joking. There are a couple of other reasons why we build our own, but that may change in the future. I take your point, and maybe some day we'll adopt the official image. For now it works, and that's super good-enough.

argibbs commented 1 year ago

Just out of my curiosity The flow something like: AIrflow -> CeleryExecutor -> BashOperator -> Docker -> Prefect ?

Correct. Although I re-emphasise that we're not using prefect as any kind of distribution system (which would be dumb, given celery). It just happens to have a very convenient out-of-the-box set of features for executing large, highly dynamic graphs of small lightweight tasks.

argibbs commented 1 year ago

About timeouts, it seems a bit suspicious when 200 dags can't be processed in reasonable time, so I guess this some dynamic DAGs, which created from single python file.

The might be a good reason why some bad things happen, just because Airflow could decide that DAG might be deleted. I'm not sure in this case it is a bug or it's "by design". Some links which might improve your DAG parsing time

Processing time is really variable. Sometimes it's 4 seconds, sometimes it's 25, sometimes it's 50+. And before you ask, no the box CPU is pretty constant. However, I'm doing my best to eliminate exogenous factors before I start claiming the problem is 100% in airflow.

I'm not really sure I believe that a dag flickering in and out of existence could ever be a good thing. OK, yes, it led me to discover the dag processor timeout, but only once I'd dropped down to a single scheduler. There must be better ways. Edit: The new cluster activity overview is the obvious home for such information, but for some reason my dag processor state is "unknown".

Thank you for the links, there are a couple of new features in there since 2.3.3 that I wasn't aware of, especially the parsing context. It would be good to get parse time down (rather than just upping the timeout) because that'll have other benefits. That said, given that I can just increase the timeout as a crude fix, I think getting my parse time down is an orthogonal problem (i.e. off-topic) for the original issue.

argibbs commented 1 year ago

Anyhoo, I am now running two experiments:

  1. Multiple schedulers + increased timeout.
  2. Single scheduler + increased timeout.

Early indications are that increasing the timeout hasn't fixed the problem. I don't see any errors in the logs any more, but my "single scheduler + increased timeout" environment is still having problems. I'm double checking some things though, so reserve the right to an embarrassed cough and self-correction.

Taragolis commented 1 year ago

There are a couple of other reasons why we build our own, but that may change in the future.

If not yet, you should consider to use any process daemon/supervisor behind your shell entrypoint. As I mention before it could be tiny or dumb-init both of them presented in debian repo. dumb-init uses in official Airflow image, tiny uses in official Prefect image, as user I can't find any differences between this two init managers both of them do great job - no more zombie process.

highly dynamic graphs of small lightweight tasks.

It also could be a reason of fantom failures, if DAG graph could be change during execution, e.g.: task deleted, dependency between tasks changed and etc, then a lot of funny side effects could happen.

Processing time is really variable. Sometimes it's 4 seconds, sometimes it's 25, sometimes it's 50+. And before you ask, no the box CPU is pretty constant. However, I'm doing my best to eliminate exogenous factors before I start claiming the problem is 100% in airflow.


The more I look at this issue the more filling that is not an issue rather then discussion and troubleshooting 🤔

argibbs commented 1 year ago

The more I look at this issue the more filling that is not an issue rather then discussion and troubleshooting 🤔

I have a valid airflow install, that's not doing anything illegal or dodgy, that worked fine in 2.3.3 and upon upgrading to 2.7.0 (and subsequently 2.7.1) it stopped working reliably. Seems like the definition of an issue to me. 🤷

Fixing it might mean finding a new set of config flags without an actual code change, but that's doesn't change the fact that right now, it no worky.

This issue's reply-thread feels very discussion-y, yes, but partly because I'm fielding related topics like "what I use prefect for" and "how do you build your docker image". I don't mind, I love talking about this stuff in as much detail as is relevant, and I'm also learning stuff as a result (so win-win), but it does detract from the core issue, yes. 😄

argibbs commented 1 year ago

If not yet, you should consider to use any process daemon/supervisor behind your shell entrypoint.

Yes, we already do (not just in airflow, in almost all our docker containers. It's a no-brainer).

argibbs commented 1 year ago

It also could be a reason of fantom failures

My component is not failing. I am seeing failures in other task that don't run prefect. To be clear, and to reiterate the initial issue explanation, the logs end with:

[2023-09-13, 12:26:30 BST] {subprocess.py:97} INFO - Command exited with return code 0
[2023-09-13, 12:26:31 BST] {taskinstance.py:1398} INFO - Marking task as SUCCESS. dag_id=STOXX600_Index_1.ANC_110.AEE_110.QTR1.Realtime, task_id=Shared-Data.split-adjustments-combined, execution_date=20230913T090500, start_date=20230913T112612, end_date=20230913T112631
[2023-09-13, 12:26:31 BST] {local_task_job_runner.py:228} INFO - Task exited with return code 0

Those are messages from the airflow code, after my component has exited, saying "yup, all ok, this ran fine", and yet, it's still marked as failed in the gui.

argibbs commented 1 year ago

Early indications are that increasing the timeout hasn't fixed the problem

I'm still seeing the problem. The timeout is now very generous, there are no longer any errors in the dag processor manager, and yet there are still issues. I have some other ideas of things to look at, will update as/when I make progress.

Or of course, if you need specific logs, or even have a fix, please let me know.

argibbs commented 1 year ago

DAG generated thought the DB? DAG placed on some network share, e.g. NFS?

There's no db queries (other than the airflow db that the airflow code queries, natch). It's on a file share, but as mentioned above I have two envs, they are sharing the same fileserver; one works fine, the other not. I don't think it's NFS issues.

argibbs commented 1 year ago

Ok, I enabled the secrets cache, as per one of the links from @Taragolis - this reduced the dag processing time to a very consistent ~4-5 seconds, so I guess the delay and variance was due to load on the airflow db? We do use a fair few airflow vars in the dag construction.

This unfortunately has not fixed the issue.

jscheffl commented 1 year ago

Then besides the logs from an executed task which are positive and all the discussions about the image in between: Before I had the suspect (from own experience) that task logs were OK but because of anything in the infrastructure the signalling after writing the logs towards celery and metadata DB might still break things. As other problems are sorted out - are there any exceptions printed in the worker or scheduler logs at times where the successful tasks are reported failed to DB? (Sorry this might be a bit fiddeling in the stdout of the containers though)

argibbs commented 1 year ago

Ok, I have found the problem, and fixed it, for some value of fixed. I have Thoughts™️ on the problem, so apologies in advance for what might be quite a long update. I'll do my best to make it readable.

Edit: I listed some potential improvements at the bottom. Even if you don't read the big bit in the middle, I would appreciate thoughts on those.

The fix

Smoking gun

I found the following message in my scheduler logs: Marked the following 85 task instances stuck in queued as failed., and then there followed a list of all the tasks which had been taken um die Ecke and shot.

That message comes from scheduler_job_runner.py:1574.

Solution

If you follow through where the timeout is set, it's the config property task_queued_timeout.

I upped this to 7200 (2 hours) in my config, and the problem instantly went away.

Q&A

Why now?

That's easy: this config was added in 2.6.0; as noted above I jumped from 2.3.3 -> 2.7.0. This is an argument for not falling too far behind else when you do update, there's a lot of changes to work through when debugging. Mea culpa.

What was going wrong?

(I'm changing some numbers here to keep things round and simple)

The way our system is configured, we have a celery queue called data. There are 20 workers serving this queue. At various times during the day, some data arrives, and several dags all start running at the same time as a result. There are 100 tasks which get dumped onto the data queue. These tasks take 6 minutes each to run. So that's (optimistically) 100 tasks * 6 minutes / 20 queue workers = 30 minutes until all tasks are processed.

Notably, this means that after 10 minutes, we'll have completed 20 tasks, another 20 will be running, and the remaining 60 tasks will still be queued. 10 minutes is the default value for this new task_queued_timeout, so after 10 minutes, the scheduler kills all remaining 60 tasks. (Note by default the scheduler only checks every 2 minutes, but that's immaterial given the huge wait time on the queue - but remember this fact, I'll come back to it below).

Why did you decide increasing the timeout was the right fix?

Ah, I'm glad you asked.

The (slightly simplified) lifecycle of a task is typically NEW -> SCHEDULED -> QUEUED -> RUNNING -> SUCCESS|FAILED. The main things that prevent a task getting queued are 1. upstream dependencies, and 2. available pool slots. So, an alternative solution I briefly considered was throwing a pool over these data tasks to prevent too many of them being queued at once. However, I then applied the following reasoning:

  1. "I need to get the size of this pool right. Too many, and there's a risk things will be stuck on the queue and get killed. But too few, and I'm going to be under-utilising my pool. 25 seems like a safe number. Then in theory, no more than 5 tasks will be sat on the queue at any time, and those for no more than 6 minutes."
  2. "Hmm, but hang on, what about other dags that happen to run at the same time on unrelated data? They'll also be queuing at the same time, and I run the risk of them also being killed."
  3. "So, I guess I need to put every task that uses the data queue into the same pool? That's a lot of work, and brittle if someone adds more tasks in the future and forgets/doesn't know about this problem."
  4. "Oh, and if down the line I get more workers, I'm going to have to remember to increase my pool, else I'll not be taking advantage of my new workers."
  5. "Hang on again, what if someone makes a code change and the tasks each start taking 12 minutes? Then if even one task gets queued, it'll be on the queue for 12 minutes and will get killed. I need to set the pool to be no larger than the worker slots. I'm basically guaranteed to be under-utilising the workers now, given airflow scheduler overheads. Not all of my tasks take 6 minutes, and the shorter the tasks, the worse it is"
  6. "Seems like I'm just putting a slightly smaller queue in front of my first queue."
  7. "Why am I doing this? It's daft. Everything worked fine in 2.3.3 without this setting, let's just effectively turn it off"

So, 7200. Does this mean I think this config option is pointless for our setup? So far, I have not seen any benefit. But YMMV.

Weren't there timeouts in the old code?

Yes, there were. I have checked, and the old setting [celery].stalled_task_timeout was its default value of 300. I haven't done a archaeological dive into the code, but the implementation has obviously changed somehow. It's kind of moot - all that really matters is what the current code does.

Wait, isn't the title of this issue "Tasks being marked as failed even after running successfully"? They're not queued at that point.

Another good question. I do not know why, but for whatever reason the scheduler is seeing both actively running and recently completed tasks as queued, and killing them. Maybe this is a classification thing - i.e. "queued" for the purposes of this config means "task-state-is-queued-or-running-or-recently-finished". Maybe this is an issue with database load, and although the task has started running it hasn't been recorded as such in the db. I dunno.

This manifested as tasks being marked as failed despite having run successfully as outlined in this issue description. I also found examples of tasks that were killed while running. Their logs contained messages like:

[2023-09-15, 08:24:21 BST] {subprocess.py:93} INFO - Starting up and doing some stuff
[2023-09-15, 08:25:17 BST] {subprocess.py:93} INFO - I'm still doing stuff
[2023-09-15, 08:25:17 BST] {local_task_job_runner.py:294} WARNING - State of this instance has been externally set to up_for_retry. Terminating instance.
[2023-09-15, 08:25:17 BST] {process_utils.py:131} INFO - Sending 15 to group 3257. PIDs of all processes in the group: [3258, 3257]
[2023-09-15, 08:25:17 BST] {process_utils.py:86} INFO - Sending the signal 15 to group 3257
[2023-09-15, 08:25:17 BST] {taskinstance.py:1630} ERROR - Received SIGTERM. Terminating subprocesses.
[2023-09-15, 08:25:17 BST] {subprocess.py:104} INFO - Sending SIGTERM signal to process group
[2023-09-15, 08:25:17 BST] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/operators/bash.py", line 201, in execute
    result = self.subprocess_hook.run_command(
  File "/usr/local/lib/python3.10/site-packages/airflow/hooks/subprocess.py", line 91, in run_command
    for raw_line in iter(self.sub_process.stdout.readline, b""):
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1632, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2023-09-15, 08:25:17 BST] {taskinstance.py:1398} INFO - Marking task as UP_FOR_RETRY. dag_id=My_Problem_Dag, task_id=My_Problem_Task, execution_date=20230914T130500, start_date=20230915T072420, end_date=20230915T072517
[2023-09-15, 08:25:17 BST] {standard_task_runner.py:104} ERROR - Failed to execute job 36648131 for task My_Problem_Task (Task received SIGTERM signal; 3257)
[2023-09-15, 08:25:17 BST] {process_utils.py:79} INFO - Process psutil.Process(pid=3257, status='terminated', exitcode=1, started='07:24:20') (3257) terminated with exit code 1
[2023-09-15, 08:25:17 BST] {process_utils.py:79} INFO - Process psutil.Process(pid=3258, status='terminated', started='07:24:21') (3258) terminated with exit code None

What I can tell you is that empirically, every task I found that a) was killed while running, or b) was killed after completing successfully, was killed less than 2 minutes after it started. Remember I mentioned above that the queue timeout checks once every 2 minutes? It's almost as if the code only considers the state of the tasks as they were before it started its 2 minute wait.

I haven't dug into the logic however, as this is moot: even if it wasn't whacking the tasks that were running / complete, I have enough tasks-that-are-really-actually-queued that it would still be killing them, and my solution would be the same, The only material impact is that killing running / completed tasks as well was a massive misdirect and sent me after quite a few red herrings.

Potential improvements

There are several things that have come out of this process that I feel are worth doing something about. However, I thought I'd list them here first, rather than run off and raise them as separate issues. If it's clear that I'm in a minority of 1 on their worth, we can just close this issue and everyone is happy.

  1. Display warnings about deprecated configs in the GUI somewhere. (Maybe the new cluster activity page?). This would have alerted me to the fact that the old celery timeout config had changed. Maybe it wouldn't have got me to the solution any faster, but it couldn't have hurt.
  2. Likewise, display warnings in the GUI if the dag processor is experiencing timeouts. Yes, as it turned out this was unrelated to my actual problem, but it's a pretty critical component; if you are getting timeouts, system performance can degrade in subtle ways, e.g. task schedule latency. Hat tip to @Taragolis - the new secrets cache in 2.7.0 that he linked to is a game changer here.
  3. I like metrics - maybe add a statsd counter to capture the total number of queued tasks that have been killed? Obviously someone has to monitor that metric, but they can't do that if it's not there...
  4. Make it clear in the GUI when tasks have timed out. a. Display warnings somewhere (maybe that cluster activity page again?) b. Differentiate tasks that have been killed due to task timeout, e.g. a different state such as queue_timed_out rather than plain old errored?) c. Maybe a different message in the task logs?
  5. And of course, ideally once a task is actually running or finished, it won't get whacked by a queue timeout.

Fin

I think that's everything. Well done if you made it this far. Thanks as always to everyone who contributed in the thread. I plan on leaving this open for a bit in case there's more discussion on any of the above, but if one of the maintainers wants to close it unilaterally, I'm cool with that.

jscheffl commented 1 year ago

Thanks for the result. (I made it to the end! Yeah). I assume we need to digest for a moment, I tend to split this up into multiple sub-bug tickets/improvements but would like to have other scheduler expert opinion on this ... @ashb ?

Taragolis commented 1 year ago

Let's convert it in discussion, if we need to create a separate issue(s) we could always do that later