apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Airflow worker : if _store_result fail cause db is not accessible, task stay in running #37382

Closed raphaelauv closed 9 months ago

raphaelauv commented 9 months ago

Apache Airflow version

2.8.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

in CeleryExecutor

if the db is not available to the fork process that is running the task "in" the airflow worker process ( for any invalid network configuration , like restriction on requests port connection )

tasks that save a xcom at the end of the run are never fail by the worker and stay in running state

2024-02-12T17:25:35.779427891+01:00 [2024-02-12 16:25:35,779: INFO/ForkPoolWorker-15] [1f534850-6521-4877-8911-ba2a65c4f649] Executing command in Celery: ['airflow', 'tasks', 'run', 'YYYYYYYYYYYYYYYYYY', 'PPPPPPPPPPPP', 'manual__2024-02-12T16:25:18+00:00', '--local', '--subdir', 'DAGS_FOLDER/dags/examples/XXXXXXXXXXXXXXXXXXXXXXX.py']
2024-02-12T17:25:35.878154924+01:00 [2024-02-12 16:25:35,875: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/repo/src/dags/examples/XXXXXXXXXXXXXXXXXXXXXXX.py
2024-02-12T17:25:37.759271345+01:00 [2024-02-12 16:25:37,759: WARNING/ForkPoolWorker-15] /home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagbag.py:343: AirflowProviderDeprecationWarning: This module is deprecated. Please use `airflow.providers.cncf.kubernetes.operators.pod` instead.
2024-02-12T17:25:37.759292770+01:00   loader.exec_module(new_module)
2024-02-12T17:25:37.759296059+01:00 
2024-02-12T17:25:37.903993679+01:00 [2024-02-12 16:25:37,903: INFO/ForkPoolWorker-15] Running <TaskInstance: YYYYYYYYYYYYYYYYYY.PPPPPPPPPPPP manual__2024-02-12T16:25:18+00:00 [queued]> on host outils-ia-airflow-worker-6fbcc76d4c-vncx8
2024-02-12T17:46:54.213685324+01:00 [2024-02-12 16:46:54,213: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[9c8870f7-30bc-47b5-b1f1-ca7a2b366c56] received
2024-02-12T17:46:54.219838619+01:00 [2024-02-12 16:46:54,217: WARNING/ForkPoolWorker-16] Failed operation _store_result.  Retrying 2 more times.
2024-02-12T17:46:54.219846277+01:00 Traceback (most recent call last):
2024-02-12T17:46:54.219849600+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
2024-02-12T17:46:54.219853372+01:00     self.dialect.do_execute(
2024-02-12T17:46:54.219856396+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
2024-02-12T17:46:54.219859493+01:00     cursor.execute(statement, parameters)
2024-02-12T17:46:54.219862542+01:00 psycopg2.OperationalError: server closed the connection unexpectedly
2024-02-12T17:46:54.219865702+01:00     This probably means the server terminated abnormally
2024-02-12T17:46:54.219868598+01:00     before or while processing the request.
2024-02-12T17:46:54.219871562+01:00 
2024-02-12T17:46:54.219874433+01:00 
2024-02-12T17:46:54.219877473+01:00 The above exception was the direct cause of the following exception:
2024-02-12T17:46:54.219880218+01:00 
2024-02-12T17:46:54.219883294+01:00 Traceback (most recent call last):
2024-02-12T17:46:54.219886379+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/celery/backends/database/__init__.py", line 47, in _inner
2024-02-12T17:46:54.219904965+01:00     return fun(*args, **kwargs)
2024-02-12T17:46:54.219910091+01:00            ^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:46:54.219913370+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/celery/backends/database/__init__.py", line 117, in _store_result
2024-02-12T17:46:54.219916638+01:00     task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))
2024-02-12T17:46:54.219919655+01:00            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:46:54.219930170+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 2901, in __iter__
2024-02-12T17:46:54.219933291+01:00     result = self._iter()
2024-02-12T17:46:54.219938504+01:00              ^^^^^^^^^^^^
2024-02-12T17:46:54.219941521+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 2916, in _iter
2024-02-12T17:46:54.219944536+01:00     result = self.session.execute(
2024-02-12T17:46:54.219947506+01:00              ^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:46:54.219950504+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute
2024-02-12T17:46:54.219953624+01:00     result = conn._execute_20(statement, params or {}, execution_options)
2024-02-12T17:46:54.219956602+01:00              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:46:54.219959686+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
2024-02-12T17:46:54.219962623+01:00     return meth(self, args_10style, kwargs_10style, execution_options)
2024-02-12T17:46:54.219965582+01:00            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:46:54.219968444+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
2024-02-12T17:46:54.219971374+01:00     return connection._execute_clauseelement(
2024-02-12T17:46:54.219974354+01:00            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:46:54.219977332+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
2024-02-12T17:46:54.219980285+01:00     ret = self._execute_context(
2024-02-12T17:46:54.219983244+01:00           ^^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:46:54.219986230+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
2024-02-12T17:46:54.219989189+01:00     self._handle_dbapi_exception(
2024-02-12T17:46:54.219992194+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
2024-02-12T17:46:54.219995168+01:00     util.raise_(
2024-02-12T17:46:54.219998181+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
2024-02-12T17:46:54.220001588+01:00     raise exception
2024-02-12T17:46:54.220004543+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
2024-02-12T17:46:54.220008911+01:00     self.dialect.do_execute(
2024-02-12T17:46:54.220011835+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
2024-02-12T17:46:54.220014744+01:00     cursor.execute(statement, parameters)
2024-02-12T17:46:54.220017694+01:00 sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
2024-02-12T17:46:54.220020598+01:00     This probably means the server terminated abnormally
2024-02-12T17:46:54.220023497+01:00     before or while processing the request.
2024-02-12T17:46:54.220026271+01:00 
2024-02-12T17:46:54.220029517+01:00 [SQL: SELECT celery_taskmeta.id AS celery_taskmeta_id, celery_taskmeta.task_id AS celery_taskmeta_task_id, celery_taskmeta.status AS celery_taskmeta_status, celery_taskmeta.result AS celery_taskmeta_result, celery_taskmeta.date_done AS celery_taskmeta_date_done, celery_taskmeta.traceback AS celery_taskmeta_traceback 
2024-02-12T17:46:54.220032706+01:00 FROM celery_taskmeta 
2024-02-12T17:46:54.220035633+01:00 WHERE celery_taskmeta.task_id = %(task_id_1)s]
2024-02-12T17:46:54.220041003+01:00 [parameters: {'task_id_1': '9c8870f7-30bc-47b5-b1f1-ca7a2b366c56'}]
2024-02-12T17:46:54.220043947+01:00 (Background on this error at: https://sqlalche.me/e/14/e3q8)
2024-02-12T17:46:54.261551783+01:00 [2024-02-12 16:46:54,261: INFO/ForkPoolWorker-16] [9c8870f7-30bc-47b5-b1f1-ca7a2b366c56] Executing command in Celery: ['airflow', 'tasks', 'run', 'YYYYYYYYYYYYYYYYYY', 'PPPPPPPPPPPP', 'manual__2024-02-12T16:46:48+00:00', '--local', '--subdir', 'DAGS_FOLDER/dags/examples/XXXXXXXXXXXXXXXXXXXXXXX.py']
2024-02-12T17:46:54.356993545+01:00 [2024-02-12 16:46:54,354: INFO/ForkPoolWorker-16] Filling up the DagBag from /opt/airflow/dags/repo/src/dags/examples/XXXXXXXXXXXXXXXXXXXXXXX.py
2024-02-12T17:46:56.168483211+01:00 [2024-02-12 16:46:56,168: WARNING/ForkPoolWorker-16] /home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagbag.py:343: AirflowProviderDeprecationWarning: This module is deprecated. Please use `airflow.providers.cncf.kubernetes.operators.pod` instead.
2024-02-12T17:46:56.168495203+01:00   loader.exec_module(new_module)
2024-02-12T17:46:56.168498475+01:00 
2024-02-12T17:46:56.557276868+01:00 [2024-02-12 16:46:56,557: INFO/ForkPoolWorker-16] Running <TaskInstance: YYYYYYYYYYYYYYYYYY.PPPPPPPPPPPP manual__2024-02-12T16:46:48+00:00 [queued]> on host outils-ia-airflow-worker-6fbcc76d4c-vncx8
2024-02-12T17:47:54.301569796+01:00 [2024-02-12 16:47:54,301: INFO/ForkPoolWorker-15] Using connection ID 'RRRRRRRRRR' for task execution.
2024-02-12T17:47:54.302394963+01:00 [2024-02-12 16:47:54,302: INFO/ForkPoolWorker-15] AWS Connection (conn_id='RRRRRRRRRR', conn_type='aws') credentials retrieved from login and password.
2024-02-12T17:47:55.619500791+01:00 [2024-02-12 16:47:55,616: WARNING/ForkPoolWorker-15] Failed operation _store_result.  Retrying 2 more times.
2024-02-12T17:47:55.619526413+01:00 Traceback (most recent call last):
2024-02-12T17:47:55.619530193+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
2024-02-12T17:47:55.619533524+01:00     self.dialect.do_execute(
2024-02-12T17:47:55.619536670+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
2024-02-12T17:47:55.619540010+01:00     cursor.execute(statement, parameters)
2024-02-12T17:47:55.619543535+01:00 psycopg2.OperationalError: server closed the connection unexpectedly
2024-02-12T17:47:55.619546920+01:00     This probably means the server terminated abnormally
2024-02-12T17:47:55.619549962+01:00     before or while processing the request.
2024-02-12T17:47:55.619552826+01:00 
2024-02-12T17:47:55.619555647+01:00 
2024-02-12T17:47:55.619558613+01:00 The above exception was the direct cause of the following exception:
2024-02-12T17:47:55.619561438+01:00 
2024-02-12T17:47:55.619564510+01:00 Traceback (most recent call last):
2024-02-12T17:47:55.619567610+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/celery/backends/database/__init__.py", line 47, in _inner
2024-02-12T17:47:55.619570577+01:00     return fun(*args, **kwargs)
2024-02-12T17:47:55.619573496+01:00            ^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:47:55.619576519+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/celery/backends/database/__init__.py", line 117, in _store_result
2024-02-12T17:47:55.619579938+01:00     task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))
2024-02-12T17:47:55.619582880+01:00            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:47:55.619585897+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 2901, in __iter__
2024-02-12T17:47:55.619589006+01:00     result = self._iter()
2024-02-12T17:47:55.619606611+01:00              ^^^^^^^^^^^^
2024-02-12T17:47:55.619609764+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 2916, in _iter
2024-02-12T17:47:55.619612748+01:00     result = self.session.execute(
2024-02-12T17:47:55.619615643+01:00              ^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:47:55.619618651+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute
2024-02-12T17:47:55.619621551+01:00     result = conn._execute_20(statement, params or {}, execution_options)
2024-02-12T17:47:55.619624564+01:00              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:47:55.619627578+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
2024-02-12T17:47:55.619630551+01:00     return meth(self, args_10style, kwargs_10style, execution_options)
2024-02-12T17:47:55.619633432+01:00            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:47:55.619636407+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
2024-02-12T17:47:55.619639382+01:00     return connection._execute_clauseelement(
2024-02-12T17:47:55.619642624+01:00            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:47:55.619645651+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
2024-02-12T17:47:55.619648544+01:00     ret = self._execute_context(
2024-02-12T17:47:55.619651672+01:00           ^^^^^^^^^^^^^^^^^^^^^^
2024-02-12T17:47:55.619655232+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
2024-02-12T17:47:55.619659495+01:00     self._handle_dbapi_exception(
2024-02-12T17:47:55.619663493+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
2024-02-12T17:47:55.619667643+01:00     util.raise_(
2024-02-12T17:47:55.619671788+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
2024-02-12T17:47:55.619676192+01:00     raise exception
2024-02-12T17:47:55.619680257+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
2024-02-12T17:47:55.619684564+01:00     self.dialect.do_execute(
2024-02-12T17:47:55.619688132+01:00   File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
2024-02-12T17:47:55.619690962+01:00     cursor.execute(statement, parameters)
2024-02-12T17:47:55.619693833+01:00 sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
2024-02-12T17:47:55.619696774+01:00     This probably means the server terminated abnormally
2024-02-12T17:47:55.619699641+01:00     before or while processing the request.
2024-02-12T17:47:55.619702389+01:00 
2024-02-12T17:47:55.619709185+01:00 [SQL: SELECT celery_taskmeta.id AS celery_taskmeta_id, celery_taskmeta.task_id AS celery_taskmeta_task_id, celery_taskmeta.status AS celery_taskmeta_status, celery_taskmeta.result AS celery_taskmeta_result, celery_taskmeta.date_done AS celery_taskmeta_date_done, celery_taskmeta.traceback AS celery_taskmeta_traceback 
2024-02-12T17:47:55.619713685+01:00 FROM celery_taskmeta 
2024-02-12T17:47:55.619717874+01:00 WHERE celery_taskmeta.task_id = %(task_id_1)s]
2024-02-12T17:47:55.619723564+01:00 [parameters: {'task_id_1': '1f534850-6521-4877-8911-ba2a65c4f649'}]
2024-02-12T17:47:55.619727930+01:00 (Background on this error at: https://sqlalche.me/e/14/e3q8)
2024-02-12T17:47:55.629449630+01:00 [2024-02-12 16:47:55,629: INFO/ForkPoolWorker-15] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[1f534850-6521-4877-8911-ba2a65c4f649] succeeded in 1339.8570237780223s: None

What you think should happen instead?

the worker shoudl fail the tasks and not retry indefinitely

How to reproduce

break the connection between the worker and the db ( or the pgbouncer )

tasks that need to save a xcom will stay in running

Operating System

ubuntu 22.04

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

potiuk commented 9 months ago

I believe just shutting down task in this case will not solve the problem and retrying by cleery is a good strategy, accounting for possible intermittent reason for the problem.

Have you checked if there is a celary configuration that can behave differntly in this way? Will try infinitely or the result backend you chose has some configuration to set maximum retries? Have you exhausted all the celery configuration options for that?

If there is an easy way to recover by celery itself via a configuration of seeting max retries or similar (for the chosen backend) then solution is to choose that configuration.

Also - I think the issue is wrongly stated - If the problem is of the nature you described, all the processes on the same worker will not have the connection to DB for any of the tasks they are using and this is the same for any other forked processes on the same worker. So there is a much bigger problem that should be dealt with on the deployment level.

I think there is no expectation that airflow components (like celery worker) will handle all such exceptions on their own and self-healing from any kinds of situation like that. Just stopping a worker will not solve the REAL issue which is that deployment of celery worker component stopped working properly.

Do you have healthcheck/liveness probes runnig that would automatically detect such situations and shutdown/restart such faulty component in place?

Other than that, I think it's quite unreasonable to expect that application (python process) will recover from the situations where suddenly during execution some of the basic assumptions the process had (ability to uninterruptibly connect to the metadata DB) are not holding any more. Airflow on its own is not able to self-heal the environment , this is why many of the deployments have healthcheck and liveness probes to check if the software they control is still "alive" and can react in those cases (kill and fallback the process that shows problem on another machine most of the time).

Would love to hear from you what kind of deployment level you have in place for that - that might also serve others to explain it here to see what "deployment managers" should look at when experiencing similar problems and what their monitoring should include.

raphaelauv commented 9 months ago

the airflow is deployed with the apache airflow helm chart but with restriction on the pgbouncer (that only accept precise port-range of source connection) and the forked process in the airflow worker are using a "random" range. Solution is to soften the range-port limitation on the pgbouncer

( the health check of the worker was not failing cause the worker itself was able to connect to the pgbouncer )

since it's a network policy specific things and as you explained we don't want airflow to manage that case , i'm closing the issue

thank you for your explanation and your reactivity on the issue :+1: