celery / celery

Distributed Task Queue (development branch)
https://docs.celeryq.dev
Other
24.02k stars 4.62k forks source link

Allow canvas to specify keyword arguments and result linkage #7471

Open Molanda opened 2 years ago

Molanda commented 2 years ago

Checklist

Related Issues and Possible Duplicates

Related Issues

Possible Duplicates

Brief Summary

The canvas documentation uses add and mul task examples. Both these operations are commutative and are easy to implement.

If instead, one were to use sub and div, the canvas structure becomes more challenging. Here are the two tasks:

@app.task
def sub(x, y):
   return x - y

@app.task
def div(x, y):
   return x // y

Consider the following:

The first could be configured as sub.s(7, 1) | div.s(2), but the remaining would require additional tasks: div_y_x(y, x) and div_xy(xy)

However, tasks shouldn't be required to know how they will be used in the flow.

Design

To work around this, I created my own custom task class along with a helper class.

I'm not suggesting this should be the final solution, but wanted to use this to illustrate one idea.

class TaskResult:
    """Helper class for storing the result order."""

    def __init__(self, order):
        self.order = int(order)

    def __json__(self):
        return f':r{self.order}'

class KeywordTask(Task):
   """Custom task class to arrange keyword arguments."""

    typing = False  # This implementation breaks type checking.

    def before_start(self, task_id, args, kwargs):
        if len(args) == 1 and type(args[0]) == list:
            vargs = args[0]
        else:
            vargs = args
        for k, v in kwargs.items():
            if type(v) == str and v.startswith(':r'):
                kwargs[k] = vargs[int(v[2:])]
        args.clear()
        super().before_start(task_id, args, kwargs)

With this configuration, the tasks signatures specify the keyword arguments and either provide a value or link to a specific result from the previous step.

Here are the three examples...

sub.s(x=7, y=1) | div.s(x=TaskResult(0), y=2)
sub.s(x=3, y=1) | div.s(x=6, y=TaskResult(0))
celery.group(sub.s(x=7, y=1), sub.s(x=3, y=1)) | div.s(x=TaskResult(0), y=TaskResult(1))

Architectural Considerations

None

Proposed Behavior

To avoid compatibility issues, this should probably require a flag on the task or app (require_kwargs) or different signature functions (keyword_signature/ks).

I would also expect the final solution would be better integrated into the canvas.

My workaround uses special values that I don't expect from the tasks I have setup. It would be better to store the keyword argument to result linkage in some request meta-data and have this unwrapped before the type checking.

Proposed UI/UX

Diagrams

N/A

Alternatives

None

open-collective-bot[bot] commented 2 years ago

Hey @Molanda :wave:, Thank you for opening an issue. We will get back to you as soon as we can. Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors. If you require immediate assistance please consider sponsoring us.