celery / django-celery-results

Celery result back end with django
Other
668 stars 206 forks source link

Unable to retrieve results across workers #417

Open sdrabblescripta opened 6 months ago

sdrabblescripta commented 6 months ago

I have two queues although only one is actively processing tasks, call them A and B. Each is deployed in an individual docker deployment consisting of an app container and a celery worker. Both queues share one rabbit MQ running in its own container.

I can successfully start tasks in B.celery from A.app. I can watch the task in B's log and see it completes. In B.app I am able to retrieve the task's state and result.

If, however, I try to retrieve the task's result/ state in A.app, those are always empty and PENDING, and get() / wait() just hang.

This is problematic because A is where, for the most part, all tasks are started, and A needs to send the same task to each of B, C, D, ... then wait for all tasks to complete before moving on. I can't use chained tasks or similar because the tasks all need to run in parallel.

Is what I'm doing just not possible? If not, how would I go about obtaining the results in A for all tasks run in other queues?

50-Course commented 6 months ago

Hi @sdrabblescripta,

Could you provide environment/OS configuration to help replicate the issue you are facing?

sdrabblescripta commented 6 months ago

Python: 3.8 celery: 5.3.4 (emerald-rush) d-c-r: 2.4.0

Please let me know what else you need.

sdrabblescripta commented 5 months ago

@50-Course any update?

50-Course commented 5 months ago

Hi @sdrabblescripta,

Unfortunately no. It's been a stretched week for me. However, I should be able to take a look at this over the weekend.

sdrabblescripta commented 5 months ago

No worries, any attention you can give the matter would be great!

50-Course commented 5 months ago

@sdrabblescripta, Could you please provide a simplified information about your architecture? I would like to clarify if:

50-Course commented 5 months ago

That aside, chained tasks run sequentially, respecting the order of arrangement. To run tasks in parrallel, you would have to explictly call the group function, which returns a GroupResult instance that you may call your .get() on.

The difference here is in apply_async instead of the conventional wait() method.

from celery import group

# Assuming you have celery_worker_B and celery_worker_C configured
your_task_group = group(
    your_task.s(*args, **kwargs).set(queue='B'),
    your_task.s(*args, **kwargs).set(queue='C'),
    ...
)

# Apply the group of tasks asynchronously
result_group = your_task_group.apply_async()

Again if your have tried the above and still won't work out, please provide the above requirements to help diagnose the issue.

You may track the progress here upon providing the above: https://github.com/50-Course/dj-celery-results-multiple-worker-failure

sdrabblescripta commented 5 months ago

Hi @50-Course ,

I have two apps, A and B - they share some code but models are mostly NOT shared. There's a single A with its own celery worker, multiple Bs each with their own celery worker, and a single MQ.

A has to start a task in all B workers, which is why I thought group couldn't be used - the task is not importable in A, so I use send_task. If this is in error please let me know and I'll try the group approach.