plotly / dash

Data Apps & Dashboards for Python. No JavaScript Required.
https://plotly.com/dash
MIT License
21.4k stars 2.06k forks source link

Integrate Celery Tasks better with Long Callbacks #2124

Open achimgaedke opened 2 years ago

achimgaedke commented 2 years ago

Motivation

As dash introduced me to Celery, I am using it to organise compute-intensive calculations in the back of a dashboard. So, I'd love to create some groups / chains / chords... use a library of tasks, which I can use inside a calllback and maintain independently of dash as pure celery tasks.

My first approach was to use AsyncResult.get() and started using the "anti-patterns" described in https://docs.celeryq.dev/en/stable/userguide/tasks.html#avoid-launching-synchronous-subtasks - including disable_sync_subtasks=False in order to get the task result value itself back to dash.

The dash community is wondering about this for a while as well, see https://community.plotly.com/t/triggering-callback-from-within-python/23321

Proposed change

After poking around for a while, I'd like to suggest amend the CeleryLongCallbackManager's task creation function _make_job_fn (code as of release 2.5.1, seems to be in the middle of a refactor on the develop branch)- file https://github.com/plotly/dash/blob/v2.5.1/dash/long_callback/managers/celery_manager.py:

def _make_job_fn_async(fn, celery_app, progress, args_deps):
    cache = celery_app.backend

    # Hash function source and module to create a unique (but stable) celery task name
    fn_source = inspect.getsource(fn)
    fn_str = fn_source
    fn_hash = hashlib.sha1(fn_str.encode("utf-8")).hexdigest()

    @celery_app.task(name=f"long_callback_output_{fn_hash}")
    def job_result_fn(user_callback_output, result_key):
        cache.set(result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder))

    @celery_app.task(name=f"long_callback_{fn_hash}")
    def job_fn(result_key, progress_key, user_callback_args, fn=fn):
        def _set_progress(progress_value):
            cache.set(progress_key, json.dumps(progress_value, cls=PlotlyJSONEncoder))

        maybe_progress = [_set_progress] if progress else []
        if isinstance(args_deps, dict):
            user_callback_output = fn(*maybe_progress, **user_callback_args)
        elif isinstance(args_deps, (list, tuple)):
            user_callback_output = fn(*maybe_progress, *user_callback_args)
        else:
            user_callback_output = fn(*maybe_progress, user_callback_args)

        # Added cases for celery Task and Signature
        # set the result value with a task added (linked/chained)
        if isinstance(user_callback_output, Task):
            user_callback_output.apply_async(link=job_result_fn.s(result_key))
        elif isinstance(user_callback_output, Signature):
            (user_callback_output | job_result_fn.s(result_key))()
        # Otherwise do everything within this callback as before
        else:
            cache.set(result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder))

    return job_fn

The main additions are:

If the user callback returns a Celery Task or Signature, the result will be written to the cache by a separate task executed when the user-defined tasks are done. The callback function will return immediately (instead of being blocked by a busy wait) once those tasks are created and scheduled. The task job_result_fn will eventually provide the result to dash.

This change will allow the long callback to work as before, but celery power users are able to leverage the task and workflow canvas system.

Example Project

I've uploaded an example project at https://github.com/achimgaedke/dash-celery-integration:

The class CeleryManagerTasksin the module celery_manager_tasks can easily be used as a drop-in replacement for the CeleryLongCallbackManager.

The file celery_integration.py is a working example, utilising a chord with 100 embarassing_parallel tasks, being merged by the collect task - simply counting the number of workers used identified by their process ids.

achimgaedke commented 2 years ago

This proposal doesn't deal with the progress callback in detail. I believe should be straightforward to integrate - if it makes sense to have a progress indicator on a distributed/parallel computing system.