celery / celery

Distributed Task Queue (development branch)
https://docs.celeryq.dev
Other
24.98k stars 4.69k forks source link

default task options from signature not propagated to chain #5219

Open shaunakgodbole opened 6 years ago

shaunakgodbole commented 6 years ago

We have a setup with different workers and tasks being executed in different workers. The queues for the tasks are defined in their signatures:

@cel.task(bind=True, acks_late=True, queue='queue-A')
def taskA(self):

@cel.task(bind=True, acks_late=True, queue='queue-B')
def taskB(self):

The queue is correctly picked up when we create a group of tasks, but doesn't get picked up when we create a chain of tasks. In the case of chain, the queue in the first task is correctly picked up, but the second task gets assigned to the celery queue.

ch = celery.chain(taskA.si(), taskB.si())

However, when we explicitly set the queue option, everything works correctly:

ch = celery.chain(taskA.s().set(queue='queue-A'), taskB.s().set(queue='queue-B')

Steps to reproduce

Worker A

Run using: celery -A worker_a worker -l info --concurrency=4 --queue=worker.a

import celery

cel = celery.Celery(
  'experiments',
  backend='redis://localhost:6379',
  broker='amqp://localhost:5672'
)

cel.conf.update(
  CELERYD_PREFETCH_MULTIPLIER = 1,
  CELERY_REJECT_ON_WORKER_LOST=True,
  CELERY_TASK_REJECT_ON_WORKER_LOST=True,
)

@cel.task(bind=True, acks_late=True, name='tasks.worker_a', queue='worker.a')
def task_on_worker_a(self):
  print("Running task on worker A")

Worker B

Run using: celery -A worker_b worker -l info --concurrency=4 --queue=worker.b

import celery

cel = celery.Celery(
  'experiments',
  backend='redis://localhost:6379',
  broker='amqp://localhost:5672'
)

cel.conf.update(
  CELERYD_PREFETCH_MULTIPLIER = 1,
  CELERY_REJECT_ON_WORKER_LOST=True,
  CELERY_TASK_REJECT_ON_WORKER_LOST=True,
)

@cel.task(bind=True, acks_late=True, name='tasks.worker_b', queue='worker.b')
def task_on_worker_b(self):
  print("Running task on worker B")

Main Program

Run using: python main.py

import celery

cel = celery.Celery(
  'experiments',
  backend='redis://localhost:6379',
  broker='amqp://localhost:5672'
)
cel.set_default()

cel.conf.update(
  CELERYD_PREFETCH_MULTIPLIER = 1,
  CELERY_REJECT_ON_WORKER_LOST=True,
  CELERY_TASK_REJECT_ON_WORKER_LOST=True,
)

@cel.task(bind=True, acks_late=True, name='tasks.worker_b', queue='worker.b')
def task_on_worker_b(self):
  pass

@cel.task(bind=True, acks_late=True, name='tasks.worker_a', queue='worker.a')
def task_on_worker_a(self):
  pass

if __name__ == '__main__':
  ch = celery.chain(task_on_worker_a.si(), task_on_worker_b.s().set())
  # Works if the queue is set explicitly
  #   ch = celery.chain(task_on_worker_a.si(queue='worker.a'), task_on_worker_b.s().set(queue='worker.b'))
  ch()

Expected behavior

Actual behavior

ohld commented 5 years ago

Any solution for this? I've faced the same issue