celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.88k stars 928 forks source link

Pass task results from preceding tasks. #1563

Open faisalshahbaz opened 2 years ago

faisalshahbaz commented 2 years ago

Hello,

Here is my code. I want to pass the task myname result pass to be the task reverse in the signature as an argument.

`from app import app from app import app from time import sleep from celery.utils.log import get_task_logger import os from celery import signature, chain, group, chord from celery.result import allow_join_result

MyQUEUE = os.getenv("SCANS_QUEUE") logger = get_task_logger(name)

@app.task(queue=MyQUEUE, ignore_result=True) def reverse(text): logger.info('reverse order '.format(text)) return {"reversename": str(text[::-1])}

@app.task(queue=MyQUEUE, ignore_result=True) def add(a,b): logger.info('Addition --> a : {0} & b : {1} '.format(a,b)) return {"addition": str(a+b)}

@app.task(queue=MyQUEUE, ignore_result=True) def myname(a): logger.info('Name --> a : {0}'.format(a)) return {"name": str(a)}

@app.task(queue=MyQUEUE, ignore_result=True) def run_pipeline(a,b,n): resultchain = chain([ group([ signature( add, args=(a,b), queue=MyQUEUE ), signature( myname, args=(n), queue=MyQUEUE ) ]), signature ( reverse, args=(), queue=MyQUEUE ) ]).apply_async()

with allow_join_result():
    results = resultchain.join()
return results

`

open-collective-bot[bot] commented 2 years ago

Hey @faisalshahbaz :wave:, Thank you for opening an issue. We will get back to you as soon as we can. Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors. If you require immediate assistance please consider sponsoring us.