apache / superset

Apache Superset is a Data Visualization and Data Exploration Platform
https://superset.apache.org/
Apache License 2.0
61.77k stars 13.51k forks source link

[SIP-143] Global Async Task Framework #29839

Open villebro opened 1 month ago

villebro commented 1 month ago

[SIP-143] Proposal for Global Async Task Framework

Motivation

Note: This replaces [SIP-141] Global Async Queries 2.0 which aimed at completing [SIP-39] Global Async Query Support.

Proposed Change

Superset currently has varied and and often times opaque solutions for executing async operations (all require Celery):

Currently none of the above support deduplication or cancellation, or even viewing which tasks are queued/executing. Especially troublesome is executing long running queries synchronously in the webworkers: this can lead to the web worker becoming unresponsive if many long running queries are running simultaneously. This has lead to many orgs having to extend the webserver timeouts so as to not time out long running queries. Moving these to the async workers will both free up the webworkers, and make it possible to decrease webworker timeouts significantly, while simultaneously being able to support arbitrarily long query execution times.

In addition, beyond sharing Celery as the execution backend, there is limited sharing of code, like utils or ORM models, which has lead to significant code duplication. This both increases the risk of regressions, limits reusability of good functionality and adds significant maintenance burden.

For this reason this SIP recommends adding a new Global Async Task Framework (GATF), which will introduce the following:

New or Changed Public Interfaces

Model

A new ORM model will be introduced for async tasks with a string based identifier. When a new task is created, an entry is added to the table if it's not already there. For instance, for thumbnails, we would use the digest as the identifier. And for chart queries, we would use the cache key and so on. If the entry is already there, we consider the task already locked, and don't schedule a new one. The model will look as follows:

class AsyncTask(Base):
    __tablename__ = "async_tasks"

    id = Column(Integer, primary_key=True)
    task_id = Column(String(256), unique=True, nullable=False, index=True)
    task_type = Column(Enum(..., name="task_status"), nullable=False)
    task_name = Column(String(256), nullable=False)
    status = Column(Enum("PENDING", "IN_PROGRESS", "SUCCESS", "REVOKED", "FAILURE", name="task_status"), nullable=False)
    created_at = Column(DateTime, nullable=False)
    updated_at = Column(DateTime, nullable=False)
    ended_at = Column(DateTime, nullable=False)
    error = Column(String, nullable=True)
    state = Column(Text, nullable=True)  # JSON serializable

As pet SIP-43, we'll introduce at least a DAO for this (maybe also a set of commands). For abstracting the main GATF logic, a new decorator will be introduced, which wraps the task in a thread that can be killed as needed (the final implementation will look different, this is just to give an understanding of the main logic):

TASK_SLEEP_INTERVAL_SEC = 5

def async_task(f: Callable[..., Any]):
    @wraps(f: Callable[..., Any])
    def wrapper(*args, **kwargs):
        task_id = kwargs.get("task_id")
        if not task_id:
            raise ValueError("task_id is required for cancelable tasks")

        task = AsyncTask.query.filter_by(id=task_id).one_or_none()
        if task is None:
            raise Exception(f"Task not found: {task_id}")
        if task.status != TaskStatus.PENDING:
            raise Exception("Task {task_id} is already in progress, current status: {task.status}")
        task.status = TaskStatus.IN_PROGRESS
        db.session.commit()

        cancel_event = threading.Event()

        def monitor_status():
            while not cancel_event.is_set():
                task = AsyncTask.query.filter_by(id=task_id).one_or_none()
                if task is None:
                    cancel_event.set()
                    break
                if task.status == TaskStatus.REVOKED:
                    cancel_event.set()
                    task.delete()
                    db.session.commit()
                    break
                time.sleep(TASK_SLEEP_INTERVAL_SEC)

        monitor_thread = threading.Thread(target=monitor_status)
        monitor_thread.start()

        try:
            f(*args, cancel_event=cancel_event, **kwargs)
        except Exception as e:
            task.delete()
            db.session.commit()
            monitor_thread.join()
            raise e

        task.delete()
        db.session.commit()
        monitor_thread.join()

    return wrapper

and when used, the task will just be decorated as follows:

@celery_app.task(name="my_task")
@async_task
def my_task(task_id: str, cancel_event: threading.Event) -> None:
   # add logic here that checks cancel_event periodically

Notification method

We propose making WebSockets the sole mechanism for broadcasting task completion. This means that we will remove long polling support from async chart queries, and replace long polling in SQL Lab with WebSockets.

Frontend changes

Charts will display the current task status, and have a simple mechanism for cancelling queries if needed:

image

New dependencies

None - however, going forward, the WebSocket server will be mandatory for both SQL Lab and async chart queries.

Migration Plan and Compatibility

Phase 1 - Thumbnails

As a first step, we migrate Thumbnails to GATF, as they tend to be fairly long running tasks that currently lack deduplication. In addition, the main thumbnail rendering functionality is fairly straight forward, and will not require extensive changes. Migrating Thumbnails will require implementing all interfaces, like the GATF ORM model, UI, decorators/context managers. At the end of this phase, thumbnail rendering will be deduplicated, and Admins will be able to both see and cancel running Thumbnail tasks via the Async task list view.

Phase 2 - GAQ

In the second phase, we clean up GAQ by simplifying the architecture (see details about redundant form data cache keys etc from SIP-141) and remove long polling support. We will also migrate the GAQ metadata from Redis to the new ORM model.

At the end of this phase, we will have removed long polling from GAQ, and will have both chart query deduplication and cancellation support.

Phase 3 - The rest

In the final phase, we migrate the remaining async tasks to GATF. This mainly covers Alerts & Reports, but also any tasks that are triggered via Celery Beat that implement the provided context managers/decorators, like cache warmup, log rotation etc. At the end of this phase, and it will be possible to cancel any queued or running async task via the UI.

Rejected Alternatives

SIP-39 and SIP-141 were rejected in favor of making a more general purpose Async Task Framework.

villebro commented 1 month ago

Summary from meeting today (@mistercrunch, @michael-s-molina and myself in attendance):

zhaoyongjie commented 1 month ago

@villebro Thanks for document the SIP. I want to post a bit thought in here.

I don't think we should replace Celery as task queue. The reasons are:

Celery is a task queue but the Dask is a parallel computing framework. I'm not familiar with Dask, just have a look its document, it looks like Spark but in Python implementation so it should be different use case for a task queue.

Lack of active maintenance of Celery

Celery is activated, the latest version released at Apr 17, 2024.

Lack of advanced features that are available in Dask

Which advanced features should we want to use?

villebro commented 1 month ago

Thanks for the comments @zhaoyongjie . I think you raised a lot of good points, really appreciate the feedback 👍

@villebro Thanks for document the SIP. I want to post a bit thought in here.

I don't think we should replace Celery as task queue. The reasons are:

Celery is a task queue but the Dask is a parallel computing framework. I'm not familiar with Dask, just have a look its document, it looks like Spark but in Python implementation so it should be different use case for a task queue.

After digesting this a bit more last night, I'm also starting to lean back in the direction of Celery for now. While there may be use cases for a full DAG-style computing framework (e.g. caching chart data to before triggering the dashboard or something to that nature), I think the focus of the Dask project is slightly misaligned with what we're looking for. Regarding the other alternatives (Redis Queue, Dramatiq, Huey, APScheduler et al), I don't think any of them offer a clear improvement over Celery. So I agree, I'm kind of feeling like staying on Celery for now, and postponing the architectural overhaul to a later date when there's a clesr need for it.

Lack of active maintenance of Celery

Celery is activated, the latest version released at Apr 17, 2024.

I was going to say Celery has a long standing bug where the workers silently stop consuming tasks since many years, and this has served as an indicator of stagnation to me, as it has caused significant issues for us and required ugly workarounds: https://github.com/celery/celery/discussions/7276. However, it seems this bug has finally been fixed a few weeks ago as of Celery 5.5.0b1 (!). See Redis broker stability notes here: https://github.com/celery/celery/releases/tag/v5.5.0b1. So maybe things aren't as gloomy as I had thought, and things are slowly moving forward.

Lack of advanced features that are available in Dask

Which advanced features should we want to use?

I think one of the main features Celery lacks is proper task cancellation, which is critical to us (Dask has good mechanisms for this). Also, Dask has some nice autoscaling features which makes it possible to handle bursty operations better than Celery. Finally, Dask is newer, so it incorporates many of the mechanisms that were not so relevant during Celery's inception. Granted, many of them may not be critical for us right now (e.g. graph/DAG support), but could have use cases when they become available.

giftig commented 1 month ago

@villebro what do you mean by "proper" task cancellation here? Celery does support "revoking" tasks: https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks

It's been a while since I've done a lot of work with celery but generally I've found it fairly reliable if properly configured.

I'm not familiar with Dask though, so can't comment on what advantages it might bring.

mistercrunch commented 1 month ago

"proper" task cancellation

I'm guessing it's about running a certain routine when that happens. Some databases don't cancel the query on client disconnections, and would require us to run a routine of to kill the query. Something as simple as the STOP button in SQL Lab would have to hit an endpoint that would celery.revoke(task_id, terminate=True), and rely on some except

Conversing with GPT it seems there are ways to configure and catch things:

Screenshot 2024-08-07 at 1 59 44 PM

In my experience (mostly around Airflow's CeleryExecutor) it's common to end up with things not firing as expected, zombie tasks (Superset think the job is dead but still running somewhere on a worker machine), undead task (Superset the job is running, but nothing is running), forcing us to double-check the state. Seems like the framework is built for fire-and-forget typical web-scale workloads, as opposed to long-running tasks with good support for cancelation

mistercrunch commented 1 month ago

Trying to formalize the Model (?):

class AsyncTask(Base):
    __tablename__ = 'async_tasks'

    id = Column(Integer, primary_key=True)
    task_id = Column(String(256), unique=True, nullable=False, index=True)
    task_type = Column(Enum(..., name='task_status'), nullable=False)
    task_name = Column(String(256), nullable=False)
    status = Column(Enum('PENDING', 'IN_PROGRESS', 'SUCCESS', 'REVOKED', 'FAILURE', name='task_status'), nullable=False)
    created_at = Column(DateTime, nullable=False)
    updated_at = Column(DateTime, nullable=False)
    ended_at = Column(DateTime, nullable=False)
    error = Column(String, nullable=True)
    extra_json = Column(Text, nullable=True)
mistercrunch commented 1 month ago

About this SIP, I wanted to highlight that there is a significant opportunity to formalize and eventually isolate the "Data Access Layer" (DAL) within Superset, exposing it as a set of APIs and related backend services. The idea would be to evolve it toward a robust, independent service that facilitates not only Superset but other apps to access data, while offering mechanisms to authenticate, authorize & limit access (RLS), cache, and audit access to data within your organization.

First step in that direction is to organize the code in a way that clarifies what is and what's not part of the DAL. Something to keep in mind while working on the async framework.

villebro commented 1 month ago

Trying to formalize the Model (?):

I think this looks generally ok. Maybe we could just rename extra_json to payload or state (JSON serializable ofc), as I assume all tasks will have some state. Also, I find Enum works pretty badly on ORM tables, so we may just want to use IntEnum in the backend and have INT on the ORM table (we would call it status_id on the table, and then have a property status on the SQLA model that maps the INT value to a proper Python Enum). Finally, I think we should add a type field (I assume name will be some human readable representation, like "Email report of 'My Dashboard'"), with the following Enum-type values (this list is probably non-exhaustive, but you get the idea):

mistercrunch commented 1 month ago

Oh right, added task_type, and your feedback is all reasonable, feel free to edit my comment or to promote to the main body of the issue. About enums, personally I prefer a human-readable in-database for things like status that are pretty static/finite to avoid having to refer to the code or FK while referring directly to the database. I know it's less efficient and all, but that's for databases to figure out how to optimize/compress/dict-encode.

villebro commented 1 month ago

Sounds good - also, given that this table will likely be ephemeral, INT vs VARCHAR for the type likely doesn't make any difference. I'll integrate the final proposal into the body 👍

villebro commented 1 month ago

@mistercrunch I've moved the ORM model to the body and made some general updates. Feel free to make updates if needed. I think this is approaching votability - WDYT @rusackas @michael-s-molina ?

rusackas commented 1 month ago

I think this SIP is lookin' good! I think the SIP (now numbered 143) is most definitely ready for a [DISCUSS] thread, and likely ready for voting if there aren't any dealbreakers that turn up from that.

villebro commented 1 month ago

Notes from meeting this Monday ( @rusackas @michael-s-molina and I in attendance):

villebro commented 1 month ago

I think this SIP is lookin' good! I think the SIP (now numbered 143) is most definitely ready for a [DISCUSS] thread, and likely ready for voting if there aren't any dealbreakers that turn up from that.

Thanks @rusackas - I'll do the final changes to the body of the SIP this week and trigger the discuss once those are in place!

villebro commented 3 weeks ago

FYI: DISCUSS sent to Dev list: https://lists.apache.org/thread/ytv9vx7wrk07xxjg4m4kx5rx0zl5whx1

villebro commented 2 weeks ago

FYI: VOTE sent to Dev list: https://lists.apache.org/thread/b39obsb2qmqbo8k9twj9b17pfb42c6qr