Closed JamesKunstle closed 7 months ago
I implemented a toy application that demonstrates that the suggested fix is reasonable:
# tasks.py
import celery
import time
app = celery.Celery(
__name__,
backend='redis://localhost:6379',
broker="amqp://localhost",
)
@app.task
def wait(wtime: int) -> int:
print(f"Waiting: {wtime} seconds")
time.sleep(wtime)
print(f"Finished")
# app.py
import time
from tasks import wait, app as celery_app
import dash
from dash import dcc, html, Output, Input, callback, State
import celery
from celery.result import AsyncResult
TIMER_RESET_N_INTERVALS = 0
TIMER_ENABLED = False
TIMER_DISABLED = True
WAIT_TASK_DELAY = 10
dash_app = dash.Dash(name=__name__
)
dash_app.layout = html.Div(
children=[
dcc.Store(
id="task-ids",
storage_type="memory",
data=[]
),
dcc.Interval(
id="task-status-timer",
disabled=True,
interval=1000, # in ms
n_intervals=0,
max_intervals=1 # only run once then stop
),
html.Button(
children="Start New Task",
id="start-task"
),
html.Button(
children="Clear History",
id="clear-tasks"
),
html.Ol(
id="task-status-list",
children=[],
),
]
)
@callback(
[
Output("task-ids", "data"),
Output("task-status-timer", "n_intervals", allow_duplicate=True),
Output("task-status-timer", "disabled", allow_duplicate=True),
],
Input("start-task", "n_clicks"),
State("task-ids", "data"),
prevent_initial_call=True
)
def handleAddTask(n_clicks: int, task_ids: list):
"""
Creates a new background task.
Adds the task id to the list of task IDs to be checked.
Sets task id list.
Forces timer to turn on.
"""
print("in handleAddTask")
result: AsyncResult = wait.apply_async(args=[WAIT_TASK_DELAY])
tasklist = task_ids + [str(result)]
return tasklist, TIMER_RESET_N_INTERVALS, TIMER_ENABLED
@callback(
[
Output("task-ids", "data", allow_duplicate=True),
Output("task-status-timer", "disabled", allow_duplicate=True),
Output("task-status-list", "children", allow_duplicate=True)
],
Input("clear-tasks", "n_clicks"),
State("task-ids", "data"),
prevent_initial_call=True
)
def handleClearTasks(n_clicks: int, task_ids: list[str]):
"""
Forgets all tasks in queue.
Drops task id bookkeeping in frontend.
Disables timer.
Clears list of tasks on screen.
"""
print("in handleClearTasks")
_ = [celery_app.AsyncResult(tid).revoke(terminate=True) for tid in task_ids]
_ = [celery_app.AsyncResult(tid).forget() for tid in task_ids]
return [], TIMER_DISABLED, []
@callback(
[
Output("task-status-list", "children", allow_duplicate=True),
Output("task-status-timer", "disabled", allow_duplicate=True),
Output("task-status-timer", "n_intervals", allow_duplicate=True),
],
Input("task-status-timer", "n_intervals"),
State("task-ids", "data"),
prevent_initial_call=True
)
def handleCheckTaskStatus(n_intervals: int, task_ids: list[str]):
"""
When interval fires, checks the status of tasks in task ID list.
"""
print("in handleCheckTaskStatus")
results = [celery_app.AsyncResult(tid) for tid in task_ids]
statuses = [t.status for t in results]
status_components_list = [
html.Li(
children=f"Task {t_id} status: {t_status}"
)
for t_id, t_status in zip(results, statuses)
]
# will disable timer if True
timer_should_disable = all([t.ready() for t in results])
return status_components_list, timer_should_disable, TIMER_RESET_N_INTERVALS
if __name__ == "__main__":
dash_app.run(debug=True)
This polls the backend every second to check whether the scheduled tasks have finished. If so, the frontend components are updated to reflect the change in status.
When the query tasks are
delay
ed and put into a celery queue, a sidecar background callback task is also scheduled by Dash to monitor their status, consuming a worker process until all of the query tasks have finished. This is inefficient and can cause deadlocks.Suggested solution:
Instead of synchronously monitoring the query tasks via a scheduled Dash background callback, asynchronously check statuses with a standard callback triggered by a timer in the frontend, updating frontend components as necessary to share status.