celery / celery

Distributed Task Queue (development branch)
https://docs.celeryq.dev
Other
24.81k stars 4.67k forks source link

Celery task does not get send to broker #6661

Closed Krogsager closed 3 years ago

Krogsager commented 3 years ago

Checklist

Mandatory Debugging Information

Optional Debugging Information

Related Issues and Possible Duplicates

Related Issues

5969

Possible Duplicates

My post https://stackoverflow.com/questions/66462079/celery-task-does-not-get-send-to-broker

Environment & Settings

Celery version: 5.0.5

celery report Output:

``` software -> celery:5.0.5 (singularity) kombu:5.0.2 py:3.7.8 billiard:3.6.3.0 py-amqp:5.0.5 platform -> system:Linux arch:64bit kernel version:4.15.0-20-generic imp:CPython loader -> celery.loaders.app.AppLoader settings -> transport:pyamqp results:db+postgresql://docker:**@pg_db:5432/ broker: 'pyamqp://guest@my-rabbit//' celery_accept_content: ['json'] environ: { 'CELERY_BROKER': 'pyamqp://guest@my-rabbit//', 'CELERY_BROKER_URL': 'amqp://guest:********@my-rabbit:5672//', 'GPG_KEY': '********', 'HOME': '/home/snake', 'LANG': 'C.UTF-8', 'LS_COLORS': 'rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=01;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.Z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.zst=01;31:*.tzst=01;31:*.bz2=01;31:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.jpg=01;35:*.jpeg=01;35:*.mjpg=01;35:*.mjpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;35:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.flv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=00;36:*.au=00;36:*.flac=00;36:*.m4a=00;36:*.mid=00;36:*.midi=00;36:*.mka=00;36:*.mp3=00;36:*.mpc=00;36:*.ogg=00;36:*.ra=00;36:*.wav=00;36:*.oga=00;36:*.opus=00;36:*.spx=00;36:*.xspf=00;36:', 'PATH': '/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin', 'PWD': '/home/snake', 'PYTHONDONTWRITEBYTECODE': '1', 'PYTHONUNBUFFERED': '1', 'PYTHON_VERSION': '3.7.8', 'SHLVL': '1', 'TERM': 'xterm', '_': '/usr/local/bin/celery'} result_backend: 'db+postgresql://docker:********@pg_db:5432/' result_persistent: False result_serializer: 'json' task_serializer: 'json' worker_concurrency: 16 deprecated_settings: None ```

Steps to Reproduce

Required Dependencies

Python Packages

pip freeze Output:

``` alembic==1.5.5 amqp==5.0.5 Beaker==1.11.0 billiard==3.6.3.0 celery==5.0.5 certifi==2020.12.5 cffi==1.14.5 chardet==4.0.0 click==7.1.2 click-didyoumean==0.0.3 click-plugins==1.1.1 click-repl==0.1.6 cryptography==3.4.6 defusedxml==0.7.0 et-xmlfile==1.0.1 Flask==1.1.1 Flask-Migrate==2.5.3 Flask-pyoidc==3.4.0 Flask-SQLAlchemy==2.4.1 Flask-Testing==0.8.1 Flask-WTF==0.14.3 future==0.18.2 gunicorn==20.0.4 idna==2.10 importlib-metadata==3.7.0 importlib-resources==5.1.2 itsdangerous==1.1.0 jdcal==1.4.1 Jinja2==2.11.3 kombu==5.0.2 Mako==1.1.4 MarkupSafe==1.1.1 oic==1.1.2 openpyxl==3.0.3 prompt-toolkit==3.0.16 psycopg2==2.8.6 pycparser==2.20 pycryptodomex==3.10.1 pyjwkest==1.4.2 pyodbc==4.0.30 python-dateutil==2.8.1 python-editor==1.0.4 python-Levenshtein==0.12.2 pytz==2021.1 requests==2.25.1 six==1.15.0 SQLAlchemy==1.3.23 typing-extensions==3.7.4.3 urllib3==1.26.3 vine==5.0.0 wcwidth==0.2.5 Werkzeug==1.0.1 WTForms==2.3.3 zipp==3.4.1 ```

Other Dependencies

N/A

Broker details

Listeners

Interface: [::], port: 15672, protocol: http, purpose: HTTP API
Interface: [::], port: 15692, protocol: http/prometheus, purpose: Prometheus exporter API over HTTP
Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0

Expected Behavior

.delay and .apply_async tasks are send to rabbitmq broker.

Actual Behavior

When I try to send my task to broker (RabbitMQ) it hangs.

# python shell
promise = foo.s(first_arg="2").apply_async()
# blocking indefinitely. I expected a promise object.

If I run the task synchronously it works as expected.

# python shell
promise = foo.s(first_arg="2").apply()
>>> hello argument 2

If I interrupt .apply_async() with ctrl+c I get a traceback with some clues:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 32, in __call__
    return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 173, in _connect
    host, port, family, socket.SOCK_STREAM, SOL_TCP)
  File "/usr/local/lib/python3.7/socket.py", line 752, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -9] Address family for hostname not supported

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 325, in retry_over_time
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 866, in _connection_factory
    self._connection = self._establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 801, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/pyamqp.py", line 128, in establish_connection
    conn.connect()
  File "/usr/local/lib/python3.7/site-packages/amqp/connection.py", line 323, in connect
    self.transport.connect()
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 113, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 184, in _connect
    "failed to resolve broker hostname"))
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 197, in _connect
    self.sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.7/site-packages/celery/canvas.py", line 225, in apply_async
    return _apply(args, kwargs, **options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 565, in apply_async
    **options
  File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 749, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 532, in send_task_message
    **properties
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 178, in publish
    exchange_name, declare,
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 525, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 184, in _publish
    channel = self.channel
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 206, in _get_channel
    channel = self._channel = channel()
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 34, in __call__
    value = self.__value__ = self.__contract__()
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 221, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 884, in default_channel
    self._ensure_connection(**conn_opts)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 439, in _ensure_connection
    callback, timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 339, in retry_over_time
    sleep(1.0)

The broker connection string looks like this in the system:

~$ env | grep BROKER
CELERY_BROKER=pyamqp://guest@172.23.0.3//

The broker connection string in python:

# python shell
from src.celery import app
app.pool.connection
>>> Connection: amqp://guest:**@localhost:5672//

Before you suggest that RabbitMQ is not running, or the connection string is bad; my celery worker (consumer) process is able to connect with the same connection string.

-------------- celery@f9ab48fc6b63 v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-4.15.0-20-generic-x86_64-with-debian-9.12 2021-03-05 07:56:29
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         celery_statst_api:0x7f15b6de0450
- ** ---------- .> transport:   amqp://guest:**@my-rabbit:5672//
- ** ---------- .> results:     postgresql://docker:**@pg_db:5432/
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . foo_task
  . (long list of tasks)

[2021-03-05 07:56:30,564: INFO/MainProcess] Connected to amqp://guest:**@my-rabbit:5672//
[2021-03-05 07:56:30,581: INFO/MainProcess] mingle: searching for neighbors
[2021-03-05 07:56:31,622: INFO/MainProcess] mingle: all alone
[2021-03-05 07:56:31,647: INFO/MainProcess] celery@f9ab48fc6b63 ready.

This is how I connect app/producer to the broker. The file celeryconfig.py contains setup for broker url backend, concurrency, etc.

# celery_tasks.py
# imports...
app = Celery('celery_statst_api')
app.config_from_object(celeryconfig) # import config file

@app.task(name="foo")
def foo(first_arg: str) -> str:
    print(f"thanks for {first_arg}")
    return "OK"
open-collective-bot[bot] commented 3 years ago

Hey @Krogsager :wave:, Thank you for opening an issue. We will get back to you as soon as we can. Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors. If you require immediate assistance please consider sponsoring us.

thedrow commented 3 years ago

It seems like we can't connect to the broker. This doesn't strike me as a bug.

Krogsager commented 3 years ago

Hi thedrow. I need some help debugging this - I cannot decipher the source code. It might not be a bug per se, but the celery worker process can connect, and i can telnet to the broker at port 5672.

I debugged this line in the traceback

File "/usr/local/lib/python3.7/socket.py", line 752, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):

and the host and port are '127.0.0.1' and 5672. Why then should the code throw socket.gaierror: [Errno -9]? I would at least like to do a PR with some info for other developer on how to fix the problem.

Krogsager commented 3 years ago

Kombu does not seem to be the exact problem. I'm running this simple script and the Broker receives the message, abeit as "Unruteable".

from kombu import Connection
from os import getenv
x = getenv('CELERY_BROKER')
conn = Connection(x)
conn.connect()
producer = conn.Producer()
y = producer.publish({'hello':'world'})
print(y.failed)

False

thedrow commented 3 years ago

This is strange. socket.gaierror: [Errno -9] means Address family for hostname not supported. Since telnet is responding this may happen if something else is listening instead of RabbitMQ. Can you please try to publish something that is routable with kombu and see what happens?

Krogsager commented 3 years ago

@thedrow I ran the sample code here and it ran perfectly. Created the video queue and posted a message to it.

from kombu import Connection, Exchange, Queue
from os import getenv

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print(body)
    message.ack()

# connections
env_broker = getenv('CELERY_BROKER')
print(f"connect to {env_broker}")

with Connection(env_broker) as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

print("done.")

image

thedrow commented 3 years ago

In the configuration you supplied in the original post me you're not pointing to localhost. Are you sure there's a broker listening there? Can you celery call a task using that configuration? What about using the rabbitmq docker container and mapping 5672 to a different port and trying that?

Can you try running that code example in a clean VM? Maybe it's your system that is misconfigured?

Krogsager commented 3 years ago

In the configuration you supplied in the original post me you're not pointing to localhost.

The system configuration is as such: Container A: Celery producer and consumer. Container B: RabbitMQ (official Docker image) Container C: PostgreSQL backend (official Docker image). The containers are on the same Docker network, where the broker is named my-rabbit and resolves to IP 172.23.0.3.

In my original post I wrote CELERY_BROKER=pyamqp://guest@172.23.0.3// but that was just for clarification. It has always been my-rabbit. (I did try to manually set the IP as well for testing, but that made no difference.)

I have noticed that the different components interpret the celery broker string differently. I supply the environment variable pyamqp://guest@my-rabbit// The worker process sees: amqp://guest:**@my-rabbit:5672// The producer (app) sees amqp://guest:**@localhost:5672// <-- this stands out. The Kombu snippet sees amqp://guest:**@my-rabbit:5672//

Are you sure there's a broker listening there?

I am sure that the broker is listening because my Kombu tests from yesterday (here and here) are running in container A. Not to mention the Celery worker process.

Can you celery call a task using that configuration?

I will get back to you ASAP.

thedrow commented 3 years ago

That's strange and it may definitely be a bug.

Krogsager commented 3 years ago

Can you celery call a task using that configuration?

Yes, I can call a task from container A.

~$ celery call -a "[3]" foo_task
74c69e81-7903-4efe-be02-d864c68756bd
thedrow commented 3 years ago

And from outside the container?

Krogsager commented 3 years ago

Can you give an example?

thedrow commented 3 years ago

The correct environment variable is not CELERY_BROKER but CELERY_BROKER_URL.

If there's a mistake in the documentation, please let us know.

Krogsager commented 3 years ago

@thedrow I disagree that this issue is invalid: I do not rely on the default variable CELERY_BROKER_URL. It is a coincidence that the two env vars looks so similar. I instantiate the app like this:

#celery_tasks.py
# ...import
app = Celery('celery_statst_api')
app.config_from_object(celeryconfig) # import config file

and my config file looks like this:

#celeryconfig.py
from os import environ
broker = environ.get('CELERY_BROKER', 'default')
result_backend = 'db+postgresql://docker:************@pg_db:5432'

task_serializer= 'json'
result_serializer= 'json'
celery_accept_content = ['json']
# ...
thedrow commented 3 years ago

Why do you need a special environment variable? If you set the correct one, everything works right?

thedrow commented 3 years ago

Also, it's 'broker_url'

Krogsager commented 3 years ago

That was the issue! A simple misconfiguration. I wrote broker instead of broker_url. From a usability standpoint it is nice that the code defaults to amqp://guest:**@localhost:5672// when no broker url is set. But I think it is too aggressive. Would you consider showing a warning when no broker is explicitely set or available to the producer?

Krogsager commented 3 years ago

Why do you need a special environment variable? If you set the correct one, everything works right?

Sometimes I test several systems in the same environment, and therefore I need to separate the run settings.

thedrow commented 3 years ago

Would you consider showing a warning when no broker is explicitly set or available to the producer?

Yes, that would be nice.