apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.46k stars 184 forks source link

Introduce TaskGroups to take advantage of shared memory between task slots #332

Open thinkharderdev opened 1 year ago

thinkharderdev commented 1 year ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] (This section helps Arrow developers understand the context and why for this feature, in addition to the what)

Currently, the Ballista scheduler treats each task slot as effectively its own machine but in practice an executor may have one task slot per CPU core. So in a cluster with 12 executor nodes each with 12 cores, the scheduler would treat them like 144 independent machines. This adds a lot of unnecessary inefficiency to some common queries.

To take a simple example, for a query like SELECT * FOM table WHERE foo = 'bar' LIMIT 1000 on 144 partitions, we would break this into a two stage query

Stage 2:
GlobalLimitExec: fetch=1000
  ShuffleReaderExec

Stage 1:
ShuffleWriterExec
  LocalLimitExec
    FilterExec
      ParquetExec

Each partition would read 1000 rows and then we would schedule the second stage which would stop after reading one of the 144 output partitions from the previous stage. So we've effectively read 144k rows from when we only need 1k.

While it's not possible to eliminate this issue entirely in a distributed query we can do better than the current approach I think.

Describe the solution you'd like A clear and concise description of what you want to happen.

If the scheduler is able to schedule multiple partitions on the same node (eg executor slots on a single executor) then it can rewrite the plan to coalesce those tasks into a TaskGroup which can be executed more efficiently with existing constructs in DataFusion. For example. for our example query above, if all 12 task slots are available on a given executor, then we can take those twelve tasks and scheduler them as a single execution plan:

GlobalLimitExec: fetch=1000
  CoalescePartitionsExec
    LocalLimitExec: fetch = 1000
      FilterExec
        ParquetExec

This can be wrapped in a TaskGroup data structure that (simplified) looks something like:

struct TaskGroup {
  partitions: Vec<usize>,
  plan: Arc<dyn ExecutionGraph>
}

Then the stage 2 plan needs to be adjusted since it will read a smaller number of input partitions.

Alternatively we can generalize the existing data structures to handle this case.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

We could leave things as is

Additional context Add any other context or screenshots about the feature request here.

thinkharderdev commented 1 year ago

Started working on this and it's in some ways more straightforward than I had thought (you can see a working, but hacky since it avoids refactoring as much as possible on our fork at https://github.com/coralogix/arrow-ballista/pull/49). In that PR we sort of fake it by just sending a task status update for all the partitions in the task group where we send an empty set of shuffle locations for all but one task in the group.

But to do this properly we probably need to do a bit of refactoring. The big thing would be to decouple the concept of a Task from a single plan partition. So TaskDescription would change to something like

#[derive(Clone)]
pub struct TaskPartitions {
    job_id: String,
    stage_id: usize,
    partitions: Vec<usize>
}

/// Represents the basic unit of work for the Ballista executor. Will execute
/// one partition of one stage on one task slot.
#[derive(Clone)]
pub struct TaskDescription {
    pub session_id: String,
    pub partitions: TaskPartitions,
    pub stage_attempt_num: usize,
    pub task_id: usize,
    pub task_attempt: usize,
    pub plan: Arc<dyn ExecutionPlan>,
    pub output_partitioning: Option<Partitioning>,
}

And then we would use the task_id to connect everything together and rely on the (job_id,task_id) tuple as a unique identifier when referencing a running task (eg when cancelling running tasks) which should be adequate as task_id is guaranteed to be unique within a single job.

The only thing that I believe will not fit from the current implementation is tracking task-level failures. In this model a Task would be a transient entity as which partitions are included in any given task will be determined by how many executor slots are available at the moment it is scheduled and there would be no guarantee that if a task fails that the same set of partitions will be grouped together in the next task scheduled. We could fake it by incrementing the task failure count for a partition anytime a task that included that partition fails, but I'm not sure that adds any incremental value over just considering failures/retries at the stage level. So I would suggest that we only track failures at the stage level (as we already do) and no longer try and track individual partition attempts.

@yahoNanJing @Dandandan @andygrove I plan on working on this next week so let me know if this approach seems unreasonable :)

Edit: This would also remove the need for the LaunchMultiTask rpc as the normal task would essentially be the same structure.