apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.41k stars 14.36k forks source link

Race condition Airflow's Celery executor timeout and import redis leave a broken import #41359

Open quangvnm opened 3 months ago

quangvnm commented 3 months ago

Apache Airflow version

2.9.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Hello,

This is a copy-paste of an issue we also post on celery/kombu and redis/redis-py. We include a MCVE example at the end of this post.

TL;DR: The with timeout(seconds=OPERATION_TIMEOUT): in airflow.executors.celery_executor.send_task_to_executor might leave a very broken import of redis & may affect another package that we haven't discovered yet. This is a race condition and very hard to debug at first. To reproduce this bug, we have to

Relates:

Our environment:

airflow: this happens with both 2.6 and latest 2.9.3 version
helm chart: 1.9, 1.14 or 1.15
python: 3.11.9
redis: 4.6.0 (airflow 2.6) and 5.0.7 (airflow 2.9)
kombu: 5.3.1 (airflow 2.6) and 5.3.7 (airflow 2.9)

We 've observed this issue since at least several months ago with our airflow deployment using official helm chart, we have the same issue as in related issues/discussion:

Aug 8 08:29:02 airflow-XXX-scheduler-XXX-zhtb8 scheduler ERROR {timeout.py:68} ERROR - Process timed out, PID: 7
Aug 8 08:29:02 airflow-XXX-scheduler-XXX-zhtb8 scheduler {celery_executor.py:279} INFO - [Try 1 of 3] Task Timeout Error for Task: (TaskInstanceKey(dag_id='XXX', task_id='XXX', run_id='manual__2024-06-19T14:06:39+00:00', try_number=51, map_index=-1)).
Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler {celery_executor.py:290} ERROR - Error sending Celery task: module 'redis' has no attribute 'client'
Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler Celery Task ID: TaskInstanceKey(dag_id='XXX', task_id='XXX', run_id='manual__2024-06-19T14:06:39+00:00', try_number=51, map_index=-1)
Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 220, in send_task_to_executor
    result = task_to_run.apply_async(args=[command], queue=queue)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/task.py", line 594, in apply_async
    return app.send_task(
           ^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 797, in send_task
    with self.producer_or_acquire(producer) as P:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 932, in producer_or_acquire
    producer, self.producer_pool.acquire, block=True,
              ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 1354, in producer_pool
    return self.amqp.producer_pool
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/amqp.py", line 591, in producer_pool
    self.app.connection_for_write()]
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 829, in connection_for_write
    return self._connection(url or self.conf.broker_write_url, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 880, in _connection
    return self.amqp.Connection(
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kombu/connection.py", line 201, in __init__
    if not get_transport_cls(transport).can_parse_url:
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kombu/transport/__init__.py", line 90, in get_transport_cls
    _transport_cache[transport] = resolve_transport(transport)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kombu/transport/__init__.py", line 75, in resolve_transport
    return symbol_by_name(transport)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kombu/utils/imports.py", line 59, in symbol_by_name
    module = imp(module_name, package=package, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._XXX>", line 1204, in _gcd_import
  File "<frozen importlib._XXX>", line 1176, in _find_and_load
  File "<frozen importlib._XXX>", line 1147, in _find_and_load_unlocked
  File "<frozen importlib._XXX>", line 690, in _load_unlocked
  File "<frozen importlib._XXX_external>", line 940, in exec_module
  File "<frozen importlib._XXX>", line 241, in _call_with_frames_removed
  File "/home/airflow/.local/lib/python3.11/site-packages/kombu/transport/redis.py", line 285, in <module>
    class PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline):
                                                      ^^^^^^^^^^^^
Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler AttributeError: module 'redis' has no attribute 'client'

We've verified and we have neither redis folder nor redis.py file from our dev, this is a very sporadic error where most of the time it works, then it stops working for unknown reason, and once if happens, the scheduler is broken and couldn't schedule anything (same error message) until we restart the scheduler process (restart the pod)

This happens quite randomly (one in tens or fifty deployments of helm chart), and we couldn't reproduce it for sure for debugging purpose.

What we found out is that if this happens, this bug won't disappear until we restart (kill) the scheduler pod. We could reproduce randomly with these steps in a test airflow:

At first, we suspect that this is a case of race condition in importing redis package, because we inject debug code before the line class PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline): with print(sys.path), print(redis), print(redis.__version__), ... and everything is okay, except print(dir(redis)) gives a different result:

sys.path=['/home/airflow/.local/bin', '/usr/local/lib/python311.zip', '/usr/local/lib/python3.11', '/usr/local/lib/python3.11/lib-dynload', '/home/airflow/.local/lib/python3.11/site-packages', '/opt/airflow/dags/repo/', '/opt/airflow/config', '/opt/airflow/plugins']

redis=<module 'redis' from '/home/airflow/.local/lib/python3.11/site-packages/redis/__init__.py'>

redis.__version__=5.0.7

dir(redis)=['AuthenticationError', 'AuthenticationWrongNumberOfArgsError', 'BlockingConnectionPool', 'BusyLoadingError', 'ChildDeadlockedError', 'Connection', 'ConnectionError', 'ConnectionPool', 'CredentialProvider', 'DataError', 'InvalidResponse', 'OutOfMemoryError', 'PubSubError', 'ReadOnlyError', 'Redis', 'RedisCluster', 'RedisError', 'ResponseError', 'SSLConnection', 'Sentinel', 'SentinelConnectionPool', 'SentinelManagedConnection', 'SentinelManagedSSLConnection', 'StrictRedis', 'TimeoutError', 'UnixDomainSocketConnection', 'UsernamePasswordCredentialProvider', 'VERSION', 'WatchError', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', '__version__', 'asyncio', 'cluster', 'default_backoff', 'from_url', 'int_or_str', 'metadata', 'sentinel', 'sys']

compared to a python shell session inside the same container:

Python 3.11.9 (main, Jul 23 2024, 07:22:56) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import redis
>>> redis
<module 'redis' from '/home/airflow/.local/lib/python3.11/site-packages/redis/__init__.py'>
>>> redis.__version__
'5.0.7'
>>> dir(redis)
['AuthenticationError', 'AuthenticationWrongNumberOfArgsError', 'BlockingConnectionPool', 'BusyLoadingError', 'ChildDeadlockedError', 'Connection', 'ConnectionError', 'ConnectionPool', 'CredentialProvider', 'DataError', 'InvalidResponse', 'OutOfMemoryError', 'PubSubError', 'ReadOnlyError', 'Redis', 'RedisCluster', 'RedisError', 'ResponseError', 'SSLConnection', 'Sentinel', 'SentinelConnectionPool', 'SentinelManagedConnection', 'SentinelManagedSSLConnection', 'StrictRedis', 'TimeoutError', 'UnixDomainSocketConnection', 'UsernamePasswordCredentialProvider', 'VERSION', 'WatchError', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', '__version__', '_parsers', 'asyncio', 'backoff', 'client', 'cluster', 'commands', 'compat', 'connection', 'crc', 'credentials', 'default_backoff', 'exceptions', 'from_url', 'int_or_str', 'lock', 'metadata', 'retry', 'sentinel', 'sys', 'typing', 'utils']

We noted that dir(redis) inside the troublesome scheduler lacks several attributes, notably redis.client

Another thing we discovered is that in every case, there is always a Timeout (as you could see the log above), and sure enough, we found out later that the bug always happens while the process of importing redis is interrupted by Timeout (we print line number in redis/__init__.py and the importing didn't run till the end). In very rare case, airflow.utils.timeout doesn't work as inteded, the timeout error is printed out in the middle of import redis but the import redis still run till the end, in this case, the bug couldn't happens. But most of the time, the timeout interrupt the import.

With this idea, we injected a sleep at the end of redis/__init__.py and sure enough, we could reproduce this bug every time.

So an interrupted import give a different import than a normal import, it seems that the broken import doesn't import not-public member in package, such as redis.client in this case, Redis, StrictRedis are exposed explicitly but redis.client is set "impliciteley"

# redis/__init__.py
from redis.client import Redis, StrictRedis

I've found one comment from discuss.python.org:

import asyncio
original = id(asyncio)
from asyncio.base_events import BaseEventLoop
assert id(asyncio) == original
assert asyncio.base_events.BaseEventLoop is BaseEventLoop

From which it should be clear that asyncio.base_events is indeed guaranteed to be set after the from-import.

-- discuss.python.org

In our case, if we try to reimport an interrupted import, this isn't true anymore, the submodule isn't set at all. We didn't dig further in internal python to find out why this happens.

We see at least four options to fix this bug:

We opt for the second method in our dev at the moment

We aren't sure that this bug happens enough to be taken into consideration in upstream? But at least other dev won't loose days of debugging session as us ^^

This raise another question: Could the Timeout or another mechanism break the import and introduce this bug in another package or another hard-to-catch race condition bug?

What you think should happen instead?

The airflow timeout should not leave a broken import. We are not sure if this should be addressed to redis/redis-py though, we post the same issue in celery/kombu and redis/redis-py.

How to reproduce

This is a race condition which is hard to duplicate in local machine, so what we did is to introduce a delay to the very first import of redis to reproduce this bug for sure.

Please find below a compressed file of three file:

python -m mock_airflow gives the result:

FunctionTimedOut

After failed import redis
'redis' in sys.modules: True
'redis.client' in sys.modules: True

Reimport mock_kombu_transport_redis
After reimport redis
'redis' in sys.modules: True
'redis.client' in sys.modules: True
'client' in dir(redis): False
getattr(redis, 'client', None)=None

After reimport redis.client
'client' in dir(redis): False
getattr(redis, 'client', None)=None

After reimport: from redis import client
client=<module 'redis.client' from '***/site-packages/redis/client.py'>
'client' in dir(redis): False
getattr(redis, 'client', None)=None
client.Pipeline=<class 'redis.client.Pipeline'>

The important line is 'client' in dir(redis): False event after a reimport of failed import and if we uncomment the line of print(redis.client), it will raise an error:

AttributeError: module 'redis' has no attribute 'client'

If we inject from redis import client to redis/__init__.py, the mcve gives the diffent output:

...
After reimport redis
'redis' in sys.modules: True
'redis.client' in sys.modules: True
'client' in dir(redis): True
getattr(redis, 'client', None)=<module 'redis.client' from 'XXX/site-packages/redis/client.py'>
...

(Changes: client' in dir(redis): True and getattr(redis, 'client', None)=)

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-celery==3.7.2 apache-airflow-providers-redis==3.7.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

Helm: 3.11.3 Helm chart: apache/airflow:1.15.0 k8s: 1.28

Anything else?

A race condition which happen very rarely at the start of scheduler service, but as we deploy to dev/staging alot each day, this happens several time a day. Once it happens, it won't go away until restart of this scheduler service. Condition:

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

quangvnm commented 3 months ago

Crossed-post: redis/redis-py#3353 celery/kombu#2096

And a very helpful response from discuss.python.org

DartVeDroid commented 2 months ago

Happens to us for some time, too. The same AttributeError, the same inability to work until you restart the pod.

That's quite a coincidence: there was an another bug in kombu itself that affected redis deployments (https://github.com/celery/kombu/pull/2007). IIRC, it affected a couple Airflow versions rendering them unstable.

Now, IMO, the usability problem is the same in both cases: container doesn't even crash, thus k8s can't auto-solve it by restarting the pod.

alimoezzi commented 2 months ago

I also have experience this in k8s deployment. Would only be fix by manual intervention.

eladkal commented 3 weeks ago

According to https://github.com/celery/kombu/pull/2007#issuecomment-2248133561 The issue is fixed in Celery v5.5.0b1 so we might want to bump min version for Celery to avoid this issue for everyone.

Can anyone confirm if upgrading to Celery 5.5.0 solves the problem? (Celery 5.5.0 not released yet, but rc1 is averrable so if anyone can confirm this solves the issue by checking it in test enviroment that would be great)

DartVeDroid commented 2 weeks ago

If I'm not mistaken, I did try it when kombu 5.4.0 was released, and it didn't help. As far as I can tell, this issue is related to Airflow's celery provider only: when I upped the timeout value from 1 to 300, it stopped occurring.

Anyway, it's rather strange to me that this value is so small by default: scheduler container does not only schedule tasks, it also runs DAG analyzer, and in a real environment with cpu limit, or with badly written DAGs, analysis could easily take more than 1s, making send/fetch operations unsuccessful every now and then.

UPD: Please don't blindly believe me, though. We run custom build/deploy configuration, so it's very much possible that it's our local problem in this instance.