Open tony-lefebvre-ardian opened 6 months ago
We found a workaround : Setting CELERY_BROKER_POOL_LIMIT = 1
After some more testing, Setting CELERY_BROKER_POOL_LIMIT = 1
does not fix the issue.
Is there anyone else using Az Service Bus as a broker with Django and Celery?
I am using Azure Service Bus with Django Celery and facing an error as well. A bit different tho. After an idle timeout 10 mins, when the link is detached and then the broker reconnects, the celery worker fails to pick up the tasks from the Azure service bus. So now my queue is getting enqueued with tasks but Celery is not working on it.
celery==5.3.6 azure-servicebus==7.12.1 django==4.2.3
Terminal logs when I enqueue a task after 10 idle minutes:
2024-04-08T18:53:57.564891590Z: [ERROR] INFO 2024-04-08 18:53:57,564 session 53 128940370794240 Session state changed: <SessionState.UNMAPPED: 0> -> <SessionState.END_SENT: 4> 2024-04-08T18:53:57.564936791Z: [ERROR] INFO 2024-04-08 18:53:57,564 _connection 53 128940370794240 An error occurred when closing the connection: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not send frame out due to exception: EOF occurred in violation of protocol (_ssl.c:2426)') 2024-04-08T18:53:57.565034792Z: [ERROR] INFO 2024-04-08 18:53:57,564 _connection 53 128940370794240 Connection state changed: <ConnectionState.OPENED: 9> -> <ConnectionState.END: 13> 2024-04-08T18:53:57.687868787Z: [ERROR] INFO 2024-04-08 18:53:57,687 _connection 53 128940370794240 Connection state changed: None -> <ConnectionState.START: 0> 2024-04-08T18:53:57.700798508Z: [ERROR] INFO 2024-04-08 18:53:57,700 _connection 53 128940370794240 Connection state changed: <ConnectionState.START: 0> -> <ConnectionState.HDR_SENT: 2> 2024-04-08T18:53:57.700907610Z: [ERROR] INFO 2024-04-08 18:53:57,700 _connection 53 128940370794240 Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.HDR_SENT: 2> 2024-04-08T18:53:57.701355317Z: [ERROR] INFO 2024-04-08 18:53:57,701 _connection 53 128940370794240 Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.OPEN_PIPE: 4> 2024-04-08T18:53:57.701714523Z: [ERROR] INFO 2024-04-08 18:53:57,701 session 53 128940370794240 Session state changed: <SessionState.UNMAPPED: 0> -> <SessionState.BEGIN_SENT: 1> 2024-04-08T18:53:57.702507337Z: [ERROR] INFO 2024-04-08 18:53:57,702 link 53 128940370794240 Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 2024-04-08T18:53:57.702673340Z: [ERROR] INFO 2024-04-08 18:53:57,702 management_link 53 128940370794240 Management link receiver state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 2024-04-08T18:53:57.703045946Z: [ERROR] INFO 2024-04-08 18:53:57,702 link 53 128940370794240 Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 2024-04-08T18:53:57.703171548Z: [ERROR] INFO 2024-04-08 18:53:57,703 management_link 53 128940370794240 Management link sender state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 2024-04-08T18:53:57.706647708Z: [ERROR] INFO 2024-04-08 18:53:57,706 _connection 53 128940370794240 Connection state changed: <ConnectionState.OPEN_PIPE: 4> -> <ConnectionState.OPEN_SENT: 7> 2024-04-08T18:53:57.757513375Z: [ERROR] INFO 2024-04-08 18:53:57,757 _connection 53 128940370794240 Connection state changed: <ConnectionState.OPEN_SENT: 7> -> <ConnectionState.OPENED: 9> 2024-04-08T18:53:57.808306241Z: [ERROR] INFO 2024-04-08 18:53:57,807 session 53 128940370794240 Session state changed: <SessionState.BEGIN_SENT: 1> -> <SessionState.MAPPED: 3> 2024-04-08T18:53:57.859091107Z: [ERROR] INFO 2024-04-08 18:53:57,858 link 53 128940370794240 Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 2024-04-08T18:53:57.859257310Z: [ERROR] INFO 2024-04-08 18:53:57,859 management_link 53 128940370794240 Management link receiver state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 2024-04-08T18:53:57.910387682Z: [ERROR] INFO 2024-04-08 18:53:57,910 link 53 128940370794240 Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 2024-04-08T18:53:57.910553185Z: [ERROR] INFO 2024-04-08 18:53:57,910 management_link 53 128940370794240 Management link sender state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 2024-04-08T18:54:02.926372405Z: [ERROR] INFO 2024-04-08 18:54:02,926 _pyamqp_transport 53 128940370794240 AMQP error occurred: (AMQPLinkError("Error condition: amqp:link:detach-forced\n Error Description: The link 'G38:104649414:f17e93c2-cd96-4214-968a-95912a81d4c4' is force detached. Code: publisher(link29163562). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00.")), condition: (b'amqp:link:detach-forced'), description: (b"The link 'G38:104649414:f17e93c2-cd96-4214-968a-95912a81d4c4' is force detached. Code: publisher(link29163562). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00."). 2024-04-08T18:54:02.926918915Z: [ERROR] INFO 2024-04-08 18:54:02,926 link 53 128940370794240 Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4> 2024-04-08T18:54:02.927428223Z: [ERROR] INFO 2024-04-08 18:54:02,926 management_link 53 128940370794240 Management link receiver state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4> 2024-04-08T18:54:02.927813630Z: [ERROR] INFO 2024-04-08 18:54:02,927 link 53 128940370794240 Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4> 2024-04-08T18:54:02.927989633Z: [ERROR] INFO 2024-04-08 18:54:02,927 management_link 53 128940370794240 Management link sender state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4> 2024-04-08T18:54:02.928305038Z: [ERROR] INFO 2024-04-08 18:54:02,928 session 53 128940370794240 Session state changed: <SessionState.MAPPED: 3> -> <SessionState.END_SENT: 4> 2024-04-08T18:54:02.928612944Z: [ERROR] INFO 2024-04-08 18:54:02,928 _connection 53 128940370794240 Connection state changed: <ConnectionState.OPENED: 9> -> <ConnectionState.CLOSE_SENT: 11> 2024-04-08T18:54:02.928799147Z: [ERROR] INFO 2024-04-08 18:54:02,928 _connection 53 128940370794240 Connection state changed: <ConnectionState.CLOSE_SENT: 11> -> <ConnectionState.END: 13> 2024-04-08T18:54:02.928973250Z: [ERROR] INFO 2024-04-08 18:54:02,928 session 53 128940370794240 Session state changed: <SessionState.END_SENT: 4> -> <SessionState.DISCARDING: 6> 2024-04-08T18:54:02.929193254Z: [ERROR] INFO 2024-04-08 18:54:02,929 link 53 128940370794240 Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0> 2024-04-08T18:54:02.929395657Z: [ERROR] INFO 2024-04-08 18:54:02,929 management_link 53 128940370794240 Management link sender state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0> 2024-04-08T18:54:02.929593161Z: [ERROR] INFO 2024-04-08 18:54:02,929 link 53 128940370794240 Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0> 2024-04-08T18:54:02.929724463Z: [ERROR] INFO 2024-04-08 18:54:02,929 management_link 53 128940370794240 Management link receiver state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0> 2024-04-08T18:54:04.533610231Z: [ERROR] INFO 2024-04-08 18:54:04,533 _base_handler 53 128940370794240 'servicebus.pysdk-6567bbdd' has an exception (ServiceBusConnectionError("The link 'G38:104649414:f17e93c2-cd96-4214-968a-95912a81d4c4' is force detached. Code: publisher(link29163562). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00. Error condition: amqp:link:detach-forced.")). Retrying... 2024-04-08T18:54:04.572259901Z: [ERROR] INFO 2024-04-08 18:54:04,571 _connection 53 128940370794240 Connection state changed: None -> <ConnectionState.START: 0> 2024-04-08T18:54:04.585557231Z: [ERROR] INFO 2024-04-08 18:54:04,585 _connection 53 128940370794240 Connection state changed: <ConnectionState.START: 0> -> <ConnectionState.HDR_SENT: 2> 2024-04-08T18:54:04.585677533Z: [ERROR] INFO 2024-04-08 18:54:04,585 _connection 53 128940370794240 Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.HDR_SENT: 2> 2024-04-08T18:54:04.586191642Z: [ERROR] INFO 2024-04-08 18:54:04,586 _connection 53 128940370794240 Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.OPEN_PIPE: 4> 2024-04-08T18:54:04.586648850Z: [ERROR] INFO 2024-04-08 18:54:04,586 session 53 128940370794240 Session state changed: <SessionState.UNMAPPED: 0> -> <SessionState.BEGIN_SENT: 1> 2024-04-08T18:54:04.587664467Z: [ERROR] INFO 2024-04-08 18:54:04,587 link 53 128940370794240 Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 2024-04-08T18:54:04.587841270Z: [ERROR] INFO 2024-04-08 18:54:04,587 management_link 53 128940370794240 Management link receiver state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 2024-04-08T18:54:04.588264178Z: [ERROR] INFO 2024-04-08 18:54:04,588 link 53 128940370794240 Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 2024-04-08T18:54:04.588452081Z: [ERROR] INFO 2024-04-08 18:54:04,588 management_link 53 128940370794240 Management link sender state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 2024-04-08T18:54:04.591341631Z: [ERROR] INFO 2024-04-08 18:54:04,591 _connection 53 128940370794240 Connection state changed: <ConnectionState.OPEN_PIPE: 4> -> <ConnectionState.OPEN_SENT: 7> 2024-04-08T18:54:04.642106810Z: [ERROR] INFO 2024-04-08 18:54:04,641 _connection 53 128940370794240 Connection state changed: <ConnectionState.OPEN_SENT: 7> -> <ConnectionState.OPENED: 9> 2024-04-08T18:54:04.693346797Z: [ERROR] INFO 2024-04-08 18:54:04,692 session 53 128940370794240 Session state changed: <SessionState.BEGIN_SENT: 1> -> <SessionState.MAPPED: 3> 2024-04-08T18:54:04.744044475Z: [ERROR] INFO 2024-04-08 18:54:04,743 link 53 128940370794240 Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 2024-04-08T18:54:04.744220278Z: [ERROR] INFO 2024-04-08 18:54:04,744 management_link 53 128940370794240 Management link receiver state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 2024-04-08T18:54:04.795890573Z: [ERROR] INFO 2024-04-08 18:54:04,795 link 53 128940370794240 Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 2024-04-08T18:54:04.796157578Z: [ERROR] INFO 2024-04-08 18:54:04,795 management_link 53 128940370794240 Management link sender state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 2024-04-08T18:54:04.999994207Z: [ERROR] INFO 2024-04-08 18:54:04,999 link 53 128940370794240 Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 2024-04-08T18:54:05.008414753Z: [ERROR] INFO 2024-04-08 18:54:05,008 link 53 128940370794240 Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3>
These are my settings in Django:
In base.py:
CELERY_BROKER_URL=f"azureservicebus://{env('SERVICE_BUS_SAS_POLICY')}:{env('SERVICE_BUS_SAS_KEY')}@{env('SERVICE_BUS_NAMESPACE')}"
CELERY_TASK_DEFAULT_QUEUE=env('SERVICE_BUS_QUEUE')
CELERY_ACCEPT_CONTENT = ['pickle','json']
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_RESULT_BACKEND = 'django-db'
CELERY_RESULT_EXTENDED = True
In celery_app.py :
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
import sys
from pathlib import Path
BASE_DIR = Path(__file__).resolve(strict=True).parent.parent
sys.path.append(str(BASE_DIR / "project"))
if os.environ.get("DJANGO_DEBUG") == False :
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.staging')
else:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')
os.environ.setdefault('C_FORCE_ROOT', 'true')
app = Celery('project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
In init.py:
from .celery_app import app as celery_app
__all__ = ('celery_app',)
This is the command I execute in start.sh
nohup celery -A config.celery_app worker -Q $SERVICE_BUS_QUEUE -l INFO &
Anyone else having this issue? How many projects are using Celery with Azure Service Bus?
Checklist
main
branch of Celery. Tested with version "main" 5.4.0rc1 ac16f23" and 5.3.6 from pypi.Mandatory Debugging Information
[x] I have included the output of
celery -A proj report
in the issue. (if you are not able to do this, then at least specify the Celery version affected).[x] I have verified that the issue exists against the
main
branch of Celery.[x] I have included the contents of
pip freeze
in the issue.[x] I have included all the versions of all the external dependencies required to reproduce this bug.
Optional Debugging Information
[x] I have tried reproducing the issue on more than one Python version and/or implementation. Tested with 5.10 and 5.11rc1
[x] I have tried reproducing the issue on more than one message broker and/or result backend. Only Az Service Bus
[x] I have tried reproducing the issue on more than one version of the message broker and/or result backend. No bug found with package azure-servicebus==7.8.3. Cannot use different version of Az Service Bus (SaaS).
[x] I have tried reproducing the issue on more than one operating system.
Ubuntu. 22.04.01 (LTS)
Docker image python:3.10-slim-bookworm
Docker image python:3.10-slim-buster
[x] I have tried reproducing the issue on more than one workers pool. N/A. Only affected when producing task messages
[x] I have tried reproducing the issue with autoscaling, retries, ETA/Countdown & rate limits disabled. N/A
[x] I have tried reproducing the issue after downgrading and/or upgrading Celery and its dependencies.
Related Issues and Possible Duplicates
Related Issues
Possible Duplicates
Environment & Settings
Celery version:
celery report
Output:``` $ celery report software -> celery:5.4.0rc1 (opalescent) kombu:5.3.5 py:3.10.12 billiard:4.2.0 py-amqp:5.2.0 platform -> system:Linux arch:64bit, ELF kernel version:6.5.0-21-generic imp:CPython loader -> celery.loaders.default.Loader settings -> transport:amqp results:disabled deprecated_settings: None ```
Steps to Reproduce
Required Dependencies
Python Packages
pip freeze
Output:``` aiohttp==3.9.3 aiosignal==1.3.1 amqp==5.2.0 asgiref==3.7.2 async-timeout==4.0.3 attrs==23.2.0 azure-core==1.30.0 azure-identity==1.15.0 azure-servicebus==7.11.4 billiard==4.2.0 celery @ git+https://github.com/celery/celery.git@ac16f239985cf9248155b95788c4b6227f7f1b94 certifi==2024.2.2 cffi==1.16.0 charset-normalizer==3.3.2 click==8.1.7 click-didyoumean==0.3.0 click-plugins==1.1.1 click-repl==0.3.0 cryptography==42.0.5 Django==4.2 exceptiongroup==1.2.0 frozenlist==1.4.1 idna==3.6 iniconfig==2.0.0 isodate==0.6.1 kombu==5.3.5 msal==1.27.0 msal-extensions==1.1.0 multidict==6.0.5 -e git+https://github.com/totobest/bug-celery-az-servicebus.git@51b0bc63b57ccde2e7712d46264c922ca50d133c#egg=mysite packaging==23.2 pluggy==1.4.0 portalocker==2.8.2 prettier==0.0.7 prompt-toolkit==3.0.43 pycparser==2.21 PyJWT==2.8.0 pytest==8.0.2 pytest-asyncio==0.23.5 pytest-django==4.8.0 pytest-dotenv==0.5.2 python-dateutil==2.8.2 python-dotenv==1.0.1 requests==2.31.0 six==1.16.0 sqlparse==0.4.4 tomli==2.0.1 typing_extensions==4.10.0 tzdata==2024.1 urllib3==2.2.1 vine==5.1.0 wcwidth==0.2.13 yarl==1.9.4 ```
Other Dependencies
You need access to Azure Service Bus.
Minimally Reproducible Test Case
Project with a minimal Django app to reproduce the bug here https://github.com/totobest/bug-celery-az-servicebus. See included README.md.
```python from celery import shared_task @shared_task def add(x, y): return x + y @never_cache def az(request): for x in range(1000): add.delay(x, x + 1) return HttpResponse("OK") import asyncio import aiohttp import pytest async def multi_calls(url): async with aiohttp.ClientSession() as session: resp_1, resp_2 = await asyncio.gather(session.get(url), session.get(url)) assert resp_1.status == 200 assert resp_2.status == 200 @pytest.mark.asyncio async def test_async_ping(live_server): url = f"{live_server.url}/ping" await multi_calls(url) @pytest.mark.asyncio async def test_async_az(live_server): url = f"{live_server.url}/az" await multi_calls(url) ```
Expected Behavior
No error
Actual Behavior
Logs
Sometimes, the error is different:
Handler failed: 'NoneType' object has no attribute 'client_ready'.