depoplabs / celery-message-consumer

Tool for using the battle-tested `bin/celery` worker to consume vanilla AMQP messages (i.e. not Celery tasks)
Apache License 2.0
53 stars 12 forks source link

message consumer executing tasks #16

Closed famagusta closed 5 years ago

famagusta commented 5 years ago

Hi,

My custom message consumer is trying to execute tasks which are configured with another celery app.

Any suggestions?

Regards Robin

anentropic commented 5 years ago

we have also seen this issue... if the event handler itself enqueues a celery task, that task will be enqueued on the message-consumer broker rather than the broker of the task's app

so far I have been unable to reproduce this bug in unit tests in order to work on a fix

we only see this for tasks enqueued by the event handler function, but we are using separate brokers, have not tested the same-broker case

famagusta commented 5 years ago

I use two separate brokers actually. And I am not trying to enqueue a task in the event handler. However, my event handler is called from inside a task. All @shared_task @periodic_task decorated functions are also trying to execute via the celery message app instead of the task runner app

Below is the message celery app. On starting the worker for this, it is registering all tasks. I tried disabling it but it didn't work. I set the default queue to None to prevent the tasks from running. That in turn gave a runtime exception 'NoneType' object has no attribute 'pop' celery Somehow the task think they need to run with the messaging app

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
os.environ.setdefault('EVENT_CONSUMER_APP_CONFIG', 'app.settings')

class MessagingQueueConfig():
     BROKER_URL = settings.MESSAGE_BROKER_URL
     CELERY_DEFAULT_QUEUE = None
     CELERY_QUEUES = ()
     CELERY_IMPORTS = ()

messaging_app = Celery('message_app', set_as_current=False)
messaging_app.config_from_object(MessagingQueueConfig)

Below is the celery task app

import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings')

app = Celery('task_dj')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Any help would be greatly appreciated.

mrbaboon commented 5 years ago

We had issues running celery alongside this (which we use to publish to and consume from our event bus). I think we had the same issue as you with tasks publishing to the wrong broker. Remvoe your usage of @shared_task and replace it with @app.task (your app being your celery app).

famagusta commented 5 years ago

Fixed using above suggestions