apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

Airflow Scheduler continuously restarts when using the CeleryKubernetesExecutor #24538

Closed meetri closed 2 years ago

meetri commented 2 years ago

Apache Airflow version

2.3.2 (latest released)

What happened

The scheduler crashes with the following exception. Once the scheduler crashes restarts will cause it to immediately crash again. To get scheduler back working. All dags must be paused and all tasks that are running need to have it's state changed to up for retry. This is something we just started noticing after switching to the CeleryKubernetesExecutor.

[2022-06-16 20:12:04,535] {scheduler_job.py:1350} WARNING - Failing (3) jobs without heartbeat after 2022-06-16 20:07:04.512590+00:00
[2022-06-16 20:12:04,535] {scheduler_job.py:1358} ERROR - Detected zombie job: {'full_filepath': '/airflow-efs/dags/Scanner.py', 'msg': 'Detected <TaskInstance: lmnop-domain-scanner.Macadocious manual__2022-06-16T02:27:36.281445+00:00 [running]> as zombie', 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f96de2fc890>, 'is_failure_callback': True}
[2022-06-16 20:12:04,537] {scheduler_job.py:756} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/pyroot/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 739, in _execute
    self._run_scheduler_loop()
  File "/pyroot/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
  File "/usr/local/lib/python3.7/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat
    action(*args, **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1359, in _find_zombies
    self.executor.send_callback(request)
  File "/pyroot/lib/python3.7/site-packages/airflow/executors/celery_kubernetes_executor.py", line 218, in send_callback
    self.callback_sink.send(request)
  File "/pyroot/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/callbacks/database_callback_sink.py", line 34, in send
    db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
  File "<string>", line 4, in __init__
  File "/pyroot/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 437, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/pyroot/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
    with_traceback=exc_tb,
  File "/pyroot/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/pyroot/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 434, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/models/db_callback_request.py", line 44, in __init__
    self.callback_data = callback.to_json()
  File "/pyroot/lib/python3.7/site-packages/airflow/callbacks/callback_requests.py", line 79, in to_json
    return json.dumps(dict_obj)
  File "/usr/local/lib/python3.7/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.7/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type datetime is not JSON serializable
[2022-06-16 20:12:04,573] {kubernetes_executor.py:813} INFO - Shutting down Kubernetes executor
[2022-06-16 20:12:04,574] {kubernetes_executor.py:773} WARNING - Executor shutting down, will NOT run task=(TaskInstanceKey(dag_id='lmnop-processor', task_id='launch-xyz-pod', run_id='manual__2022-06-16T19:53:04.707461+00:00', try_number=1, map_index=-1), ['airflow', 'tasks', 'run', 'lmnop-processor', 'launch-xyz-pod', 'manual__2022-06-16T19:53:04.707461+00:00', '--local', '--subdir', 'DAGS_FOLDER/lmnop.py'], None, None)
[2022-06-16 20:12:04,574] {kubernetes_executor.py:773} WARNING - Executor shutting down, will NOT run task=(TaskInstanceKey(dag_id='lmnop-processor', task_id='launch-xyz-pod', run_id='manual__2022-06-16T19:53:04.831929+00:00', try_number=1, map_index=-1), ['airflow', 'tasks', 'run', 'lmnop-processor', 'launch-xyz-pod', 'manual__2022-06-16T19:53:04.831929+00:00', '--local', '--subdir', 'DAGS_FOLDER/lmnop.py'], None, None)
[2022-06-16 20:12:04,601] {scheduler_job.py:768} INFO - Exited execute loop
Traceback (most recent call last):
  File "/pyroot/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/pyroot/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main
    args.func(args)
  File "/pyroot/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
    _run_scheduler_job(args=args)
  File "/pyroot/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/pyroot/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/pyroot/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 739, in _execute
    self._run_scheduler_loop()
  File "/pyroot/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
  File "/usr/local/lib/python3.7/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat
    action(*args, **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1359, in _find_zombies
    self.executor.send_callback(request)
  File "/pyroot/lib/python3.7/site-packages/airflow/executors/celery_kubernetes_executor.py", line 218, in send_callback
    self.callback_sink.send(request)
  File "/pyroot/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/callbacks/database_callback_sink.py", line 34, in send
    db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
  File "<string>", line 4, in __init__
  File "/pyroot/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 437, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/pyroot/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
    with_traceback=exc_tb,
  File "/pyroot/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/pyroot/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 434, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/pyroot/lib/python3.7/site-packages/airflow/models/db_callback_request.py", line 44, in __init__
    self.callback_data = callback.to_json()
  File "/pyroot/lib/python3.7/site-packages/airflow/callbacks/callback_requests.py", line 79, in to_json
    return json.dumps(dict_obj)
  File "/usr/local/lib/python3.7/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.7/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type datetime is not JSON serializable

What you think should happen instead

The error itself seems like a minor issue and should not happen and easy to fix. But what seems like a bigger issue is how the scheduler was not able to recover on it's own and was stuck in an endless restart loop.

How to reproduce

I'm not sure of the most simple step by step way to reproduce. But the conditions of my airflow workflow was about 4 active dags chugging through with about 50 max active runs and 50 concurrent each, with one dag set with 150 max active runs and 50 concurrent. ( not really that much )

The dag with the 150 max active runs is running the kubernetesExecutor create a pod in the local kubernetes environment. this I think is the reason we're seeing this issue all of a sudden.

Hopefully this helps in potentially reproducing it.

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==3.4.0 apache-airflow-providers-celery==2.1.4 apache-airflow-providers-cncf-kubernetes==4.0.2 apache-airflow-providers-ftp==2.1.2 apache-airflow-providers-http==2.1.2 apache-airflow-providers-imap==2.2.3 apache-airflow-providers-postgres==4.1.0 apache-airflow-providers-redis==2.0.4 apache-airflow-providers-sqlite==2.1.3

Deployment

Other Docker-based deployment

Deployment details

we create our own airflow base images using the instructions provided on your site, here is a snippet of the code we use to install

RUN pip3 install "apache-airflow[statsd,aws,kubernetes,celery,redis,postgres,sentry]==${AIRFLOW_VERSION}" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION/constraints-$PYTHON_VERSION.txt"

We then use this docker image for all of our airflow workers, scheduler, dagprocessor and airflow web This is managed through a custom helm script. Also we have incorporated the use of pgbouncer to manage db connections similar to the publicly available helm charts

Anything else

The problem seems to occur quite frequently. It makes the system completely unusable.

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

meetri commented 2 years ago

Not sure how to or if it's worth making a pull request for this, but here is the change I made to get airflow scheduler back working.

diff --git a/airflow/callbacks/callback_requests.py b/airflow/callbacks/callback_requests.py
index 8112589cd..ecf0ae1c2 100644
--- a/airflow/callbacks/callback_requests.py
+++ b/airflow/callbacks/callback_requests.py
@@ -16,6 +16,7 @@
 # under the License.

 import json
+from datetime import date, datetime
 from typing import TYPE_CHECKING, Optional

 if TYPE_CHECKING:
@@ -76,7 +77,13 @@ class TaskCallbackRequest(CallbackRequest):
     def to_json(self) -> str:
         dict_obj = self.__dict__.copy()
         dict_obj["simple_task_instance"] = dict_obj["simple_task_instance"].__dict__
-        return json.dumps(dict_obj)
+
+        def datetime_serializer(obj):
+            if isinstance(obj, (datetime, date)):
+                return obj.isoformat()
+            raise TypeError(f"Type {type(obj)} not serializable")
+
+        return json.dumps(dict_obj, default=datetime_serializer)

     @classmethod
     def from_json(cls, json_str: str):
potiuk commented 2 years ago

Could you please make a PR + test with it so that we could discuss it there @meetri ?

nicolamarangoni commented 2 years ago

I have the same issue using the CeleryExecutor but not the KubernetesExecutor, the issue still persist after upgrading to 2.3.3

potiuk commented 2 years ago

@nicolamarangoni - It's very hard to comment on your "I have the same issue" without any details (because I can only guess what kind of error you have). You say "the same" but it might be very well different issue with some similiarities. Commenting "I have the same issue" generally adds exactly 0 value if not accompanied by some evidence. It brings no-one any closer to explaining the mistake people do or diagnosing and solving it.

It actually serves no purpose whatsoever, except slightly annoying the people looking at it because there is someone who could help with diagnosing and fixing and issue and yet, the only thing the person does is to complain that they have the same issue.

potiuk commented 2 years ago

Please - if you want any other action from your comment - provide some logs, and circumstances where it happened for you @nicolamarangoni

nicolamarangoni commented 2 years ago

@potiuk I have several pods with AirFlow 2.3.3. In some of them I set the KubernetesExecutor, in some others the CeleryExecutor with 2 Workers. Some of the Pods with Celery looks fine but have max 100 DAGs and very few concurrently running DAGs (maybe max 2-3 DAGs). The pods with the CeleryExecutor and several DAGs (> 150) on the other hand have the Scheduler crashing with the same error message and the same error stack as @meetri wrote. I cannot tell how many concurrent DAGs/Jobs would be running on those pods because the scheduler crashes right after importing the DAGs. Which other information would be useful for analysis?

potiuk commented 2 years ago

As usual. Dags that fail, Logs. what investigation you have done so far. Which dags are failing. And most of all - HOW DO YOU KNOIW IT's THE SAME ERROR ? We aren't even sure what is the root cause of this one, I am working on Airlow for 4 years and I would definitely not be able to assess that something is "the same" even if errror message is the same. Did you compare the stack trace ? Is thie same down to single line as the one reported here? Are the DAGs identical?

Almost by definition if you have different version of Airlfow, the error cannot be "the same", because there is very likely diferent code an path the code follows. If you based your assesment (I guiess) that you saw "datetime is not serializable", there are about 50 places in the code (I am wild guessing) where such error might happen and probably you have to multiply it by a number of various inputs.

By not providing those evidences (ideally in separate issue) you seem to "know better" and give the people who know Airflow no chance to be able to asses if it is the same error, different error or maybe your mistake. The worst thing that can happen we will mark it as duplicate. The best thing that happens when you are just writing "I have the same issue" without any evidences is that you annoy people., but it brings no-one any closer to helping you to solve your issue (yes, you have to remember this is YOUR issue, that people here - often in their free time - are trying to help you to solve your problem. This is not a helpdesk. You don''t demand answers here or urge people. You provide a helpful information that people looking here might use to help you.

Please watch my talk here - you might understand more about Exercising your empathy

jensenity commented 2 years ago

going thru the same issue with 2.3.2 and CeleryKubernetesExecutor. Even after switching all our from datetime import datetime from all our dags, this issue still occurred every now and then.

potiuk commented 2 years ago

going thru the same issue with 2.3.2 and CeleryKubernetesExecutor. Even after switching all our from datetime import datetime from all our dags, this issue still occurred every now and then.

Same comment here. If you don't provide any evidencs, logs, details, adding a comment adds no real value and does not make us closer to solving the probem @jensenity. Please. pretty please.

github-actions[bot] commented 2 years ago

This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] commented 2 years ago

This issue has been closed because it has not received response from the issue author.