emilhe / dash-extensions

The dash-extensions package is a collection of utility functions, syntax extensions, and Dash components that aim to improve the Dash development experience
https://www.dash-extensions.com/
MIT License
417 stars 59 forks source link

Support background callback using Redis & Celery #207

Open cfengai opened 2 years ago

cfengai commented 2 years ago

First I'd like to thank you for the excellent work. It saves me so much time as I wanted to put a centralized notification center for all action elements in Dash. This extension make it so much easier to program.

Dash released background callbacks since version 2.6, relatively new. This extension works fine if use DiskCache backend, but is not compatible with Celery+Redis backend. Since Celery+Redis is recommended in production environments, it is greatly appreciated if the dash-extensions can support it. Thanks!

The simple example below repeats the issue. When running it, Dash doesn't complain anything, but Celery complains the following.

[2022-08-26 22:09:48,042: ERROR/MainProcess] Received unregistered task of type 'long_callback_bef61d19044d6887e957160bf6b6e33321369cf9'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?
...
...
The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
  File "lib/python3.9/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
    strategy = strategies[type_]
KeyError: 'long_callback_bef61d19044d6887e957160bf6b6e33321369cf9'

A few different tweaks can make the code work, making me thinking the wrong callback was registered when using DashProxy in the extension.

  1. Remove the line of os.environ['REDIS_URL'] = str(REDIS_URL) makes the code work, which means using Diskcache is fine.
  2. Using declarator @dash.callback instead of @app.callback makes it work, pointing the issue came from DashProxy.

Code example:

dash==2.6.1 and dash_extensions==0.1.3

import time
from dash import DiskcacheManager, CeleryManager, html
import os
from dash_extensions.enrich import Input, Output, DashProxy

REDIS_URL = 'redis://localhost:6379'
os.environ['REDIS_URL'] = str(REDIS_URL)
if 'REDIS_URL' in os.environ:
    # Use Redis & Celery if REDIS_URL set as an env variable
    from celery import Celery
    celery_app = Celery(
        __name__,
        broker=os.environ['REDIS_URL'],
        backend=os.environ['REDIS_URL'])
    background_callback_manager = CeleryManager(celery_app)
else:
    # Diskcache for non-production apps when developing locally
    import diskcache
    cache = diskcache.Cache("./cache")
    background_callback_manager = DiskcacheManager(cache)

app = DashProxy(__name__, background_callback_manager=background_callback_manager)

server = app.server
app.layout = html.Div([
    html.Div([html.P(id="paragraph_id", children=["Button not clicked"])]),
    html.Button(id="button_id", children="Run Job!"),
    html.P(id='test')
])

if __name__ == "__main__":
    app.run_server(debug=True)

@app.callback(
    output=Output("paragraph_id", "children"),
    inputs=Input("button_id", "n_clicks"),
    prevent_initial_call=True,
    background=True,
    running=[
        (Output("button_id", "disabled"), True, False),
    ],
)
def update_clicks(n_clicks):
    time.sleep(1.0)
    return [f"Clicked {n_clicks} times"]
emilhe commented 2 years ago

I can confirm that this indeed an issue. It seems that for some reason, the Celery worker is not spun op/mapped correctly.

emilhe commented 2 years ago

It looks like Dash normally starts a Celery worker, but for some reason it doesn't happen when using dash-extensions, I'll have to look deeper into why that is. In addition, the way dash-extensions registers callbacks (during the run_server invocation) is a problem, as this function call is typically wrapped in a if __name__ == "__main__": guard.

For the latter issue, from dash-extensions==0.1.6 (soon to be released), you can add the following line to the end of you main application file,

app.register_celery_tasks()

to ensure that callbacks are registered as intended (where app is the DashProxyobject). To address the former issued, you can start the process manually by running the following command in a separate command line,

celery -A my_app_file.celery_app worker

where my_app_file is the name of the file containing your app definition, and celery_app is the variable name of the celery app.

spookyuser commented 1 year ago

Thank you for the above worked great and I will submit a pr to add it to the docs :)

Another weird thing I noticed with celery + dash_extensions is that serverside_output seems to result in this error RuntimeError: Working outside of request context whereas if you remove the sererside_output it works fine, might be due to specific way I'm using it but it was working well before I started using celery

Doesn't work

@callback(
    Output("main", "children"),
    ServersideOutput(dashboard_state, "data"), 
    Input("submit", "n_clicks"),
    State(excel_df_state, "data"),
    State(options_state, "data"),
    prevent_initial_call=True,
    background=True,
)
def create_dashboard(submit: int, excel_df: pd.DataFrame, options: Options):
    if not submit:
        raise PreventUpdate()

    dashboard = Dashboard(options=options, excel_data=excel_df).process_data()
    return dashboard.get_layout(), dashboard

Works

@callback(
    Output("main", "children"),
   #  ServersideOutput(dashboard_state, "data"), 
    Input("submit", "n_clicks"),
    State(excel_df_state, "data"),
    State(options_state, "data"),
    prevent_initial_call=True,
    background=True,
)
def create_dashboard(submit: int, excel_df: pd.DataFrame, options: Options):
    if not submit:
        raise PreventUpdate()

    dashboard = Dashboard(options=options, excel_data=excel_df).process_data()
    return dashboard.get_layout(), dashboard

Edit: Ooo think I know what's happening now, since the callback function is being run by celery it doesn't have access to the flask cache objects. Will try and refactor my code so the ServersideOutput happens elsewhere but not sure where that would be off the top of my head

Edit 2: Ooo I got it to work, it was because my celery config wasn't configured properly after I followed the steps here: https://flask.palletsprojects.com/en/1.1.x/patterns/celery/ it started working, happy to add an example to the docs of everything or just write a post on the plotly forum

I guess here is basically what you have to do in case someone from google sees this and is running intro the same error before this reaches the docs

 dash_app = DashProxy(
    __name__,
    transforms=[ServersideOutputTransform()],
    external_stylesheets=[dbc.themes.QUARTZ, dbc_css],
)
celery_app = make_celery(dash_app.server)
background_callback_manager = CeleryManager(celery_app)
dash_app._background_manager = background_callback_manager
dash_app.register_celery_tasks()

Edit3: Oh it's actually still not working but I feel like this has to be close.

Edit 4: not sure if celery is worth the effort for me even though it's the "production" callback manager, it makes everything so complicated and you have to launch multiple commands at the same time for testing, +the filesystem version works perfectly with dash, so i think I'm going to use that for now but will still write the stuff I tried on a plotly forum post in case anyone has ideas or wants to try this themselves

Edit: 5: Looks like this is a known bug: https://github.com/plotly/dash/issues/2235

reisermn commented 1 year ago

Hey @spookyuser thanks for your work to dissect this. While this bug is being resolved, did you come up with a workaround? Are you just not using ServersideOutput() for now?

pjaselin commented 1 year ago

Hey guys, the fix I found for this is you may need to use Redis as the ServersideOutputTransform backend or pass a RedisStore as the backend for a long callback. This problem is probably more clear when you run this all in separate containers.

reisermn commented 1 year ago

Hey @pjaselin thanks for weighing in - your suggested solution is actually what I already had implemented (see below) and I'm still getting the same error as @spookyuser . Also, I'm running redis in a docker container at the IP address listed.

from dash_extensions.enrich import DashProxy, ServersideOutputTransform, FileSystemStore, RedisStore
from dash import DiskcacheManager, CeleryManager
import os
from uuid import uuid4 

launch_uid = uuid4()

if 'REDIS_URL' in os.environ:
    # Use Redis & Celery if REDIS_URL set as an env variable
    from celery import Celery
    # celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
    celery_app = Celery(__name__, broker='redis://127.0.0.1:6379', backend='redis://127.0.0.1:6379')
    background_callback_manager = CeleryManager(celery_app, cache_by=[lambda: launch_uid], expire=60*60*24) # Don't expire for current user for 24 hours
    dash_extensions_backend = RedisStore(host='redis://127.0.0.1:6379')
    # dash_extensions_backend = FileSystemStore(cache_dir="cache")

else:
    # Diskcache for non-production apps when developing locally
    import diskcache
    cache = diskcache.Cache("./cache")
    background_callback_manager = DiskcacheManager(cache, cache_by=[lambda: launch_uid], expire=60*60) # Don't expire for current user for 1 hours
    dash_extensions_backend = FileSystemStore(cache_dir="cache")

app = DashProxy(
    __name__,
    transforms=[ServersideOutputTransform(backend=dash_extensions_backend)],
    background_callback_manager=background_callback_manager
    )
server = app.server
spookyuser commented 1 year ago

Hey @reisermn no problem, my solution was actually switching to react and python microservices, this error was the tipping point where I realised, wait what am I actually doing here, I suddenly saw my future with dash as being one weird bug in one weird framework, for eternity, and I was like, nah, I'm not making the same mistake as I did with Hugo again.

spookyuser commented 1 year ago

Hey @pjaselin thanks for weighing in - your suggested solution is actually what I already had implemented (see below) and I'm still getting the same error as @spookyuser . Also, I'm running redis in a docker container at the IP address listed.

from dash_extensions.enrich import DashProxy, ServersideOutputTransform, FileSystemStore, RedisStore
from dash import DiskcacheManager, CeleryManager
import os
from uuid import uuid4 

launch_uid = uuid4()

if 'REDIS_URL' in os.environ:
    # Use Redis & Celery if REDIS_URL set as an env variable
    from celery import Celery
    # celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
    celery_app = Celery(__name__, broker='redis://127.0.0.1:6379', backend='redis://127.0.0.1:6379')
    background_callback_manager = CeleryManager(celery_app, cache_by=[lambda: launch_uid], expire=60*60*24) # Don't expire for current user for 24 hours
    dash_extensions_backend = RedisStore(host='redis://127.0.0.1:6379')
    # dash_extensions_backend = FileSystemStore(cache_dir="cache")

else:
    # Diskcache for non-production apps when developing locally
    import diskcache
    cache = diskcache.Cache("./cache")
    background_callback_manager = DiskcacheManager(cache, cache_by=[lambda: launch_uid], expire=60*60) # Don't expire for current user for 1 hours
    dash_extensions_backend = FileSystemStore(cache_dir="cache")

app = DashProxy(
    __name__,
    transforms=[ServersideOutputTransform(backend=dash_extensions_backend)],
    background_callback_manager=background_callback_manager
    )
server = app.server

Yup this was exactly my setup, sorry I don't have anything better to help you with, but I feel your pain, this is one of the most annoying errors I have ever come across, I felt perpetually like I was on the verge of solving it

vsisl commented 1 year ago

For the latter issue, from dash-extensions==0.1.6 (soon to be released), you can add the following line to the end of you main application file,

app.register_celery_tasks()

to ensure that callbacks are registered as intended (where app is the DashProxyobject). To address the former issued, you can start the process manually by running the following command in a separate command line,

celery -A my_app_file.celery_app worker

where my_app_file is the name of the file containing your app definition, and celery_app is the variable name of the celery app.

I'm struggling to get this working. I tried to put app.register_celery_tasks()

in various places:

1) To app.py right after the app object is created:


app = DashProxy(__name__, external_stylesheets=external_stylesheets, background_callback_manager=long_callback_manager,
                transforms=[LogTransform()])

app.register_celery_tasks()

2) To index.py just before the app server is launched:

if __name__ == '__main__':
    app.register_celery_tasks()
    app.run_server(host='0.0.0.0')

But celery never registers my background callback. Do you have any suggestions @emilhe ? Thanks for help!

TimChild commented 11 months ago

I was also having this issue, and have possibly made some progress, although my current test does not check whether I maintain the dash_extensions functionality.

Anyway, here is a pytest that works using dash[testing] for the dash_duo fixture, and celery[pytest] for the celery_app and celery_worker fixtures (configured to run with a redis store):

def test_background_callback_with_dash_proxy2(dash_duo, celery_app, celery_worker):
    """
    Now check with the background callback using DashProxy
    """
    import time
    import dash
    from dash import CeleryManager
    from dash_extensions.enrich import DashProxy, Output, Input, html, dcc

    proxy_app = DashProxy(__name__)
    proxy_app.layout = html.Div(
        children=[
            html.Div(id="example-div", children=["some content"]),
            dcc.Input(id="example-input"),
        ],
    )

    @proxy_app.callback(
        Output("example-div", "children"),
        Input("example-input", "value"),
        background=True,
        prevent_initial_call=True,
    )
    def update_div(text):
        return text

    manager = CeleryManager(celery_app)
    dash_app = dash.Dash(__name__, background_callback_manager=manager)
    proxy_app.hijack(dash_app)

    # This must come after the hijack
    celery_worker.reload()  # So that worker can see the newly registered task

    dash_duo.start_server(dash_app)

    content = dash_duo.wait_for_element_by_css_selector("#example-div")
    assert content.text == "some content"

    dash_duo.find_element("#example-input").send_keys("new content")
    # For the next 20 seconds, keep checking whether the content has changed
    for _ in range(20):
        content = dash_duo.wait_for_element_by_css_selector("#example-div")
        if content.text == "new content":
            break
        time.sleep(1)

    content = dash_duo.wait_for_element_by_css_selector("#example-div")
    assert content.text == "new content"

Basically, hijack a regular dash.Dash app that has the background_callback_manager using the DashProxy app, then run the regular app.

I haven't thoroughly checked, but I believe the hijack should retain the dash-extensions functionality.

This is not an ideal solution, but hopefully, it is a step towards a better one.