apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.47k stars 1.01k forks source link

[Ballista] Streaming style push-based shuffle and All-at-once stage scheduling in Ballista #1805

Open mingmwang opened 2 years ago

mingmwang commented 2 years ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

A new feature enhancement

Describe the solution you'd like

Current Ballista’s shuffle implementation is very similar to Spark’s early version. It’s the hash-based shuffle solution where shuffle data is materialized to disks, each map task produces separate files for each reduce task. For a shuffle operation involves M map tasks and N reduce task, it will generate M N files. Too many tiny files will cause performance, memory and scalability issues. Later Spark version introduced the sort-based shuffle solution and became the default shuffle implementation. the sort-based shuffle will not generate MN files, each map task sort the records by the partition id + key and generate a pair of files, all records are consolidate in one data file and an index file is created to manage the data range metadata for different partitions. In Spark 2.0, the hash-based shuffle code was removed. Spark also introduced external shuffle services to serve materialized intermediate shuffle data in order to achieve better fault-tolerance and performance isolation. In the recent Spark 3.2 release, it introduces a push based shuffle solution (SPARK-30602) to further improve the shuffle stability and IO performance. With spark’s push-based shuffle, shuffle is performed at the end of the map tasks and shuffle blocks are pre-merged and pushed to selected reducer nodes or upload to spark external shuffle servers.

Other distributed compute engines like Flink and Presto also support the shuffle operation. But they didn’t materialize the shuffle data to disks, instead, shuffle data is streamingly materialized into an in-memory buffer, the reduce tasks poll the shuffle data from map tasks’ in-memory buffer to minimize the end-to-end latency.

Here, we propose a new streaming style push-based shuffle solution for Ballista. Where shuffle is performed at the end of map tasks. Instead of materializing the intermediate shuffle data to disks and generate M*N files, shuffle data is directly pushed to the reduce tasks via Arrow-Flight gRpc call to achieve very low latency. This is important for low latency queries. The corresponding Stage scheduling will be enhanced to support the All-at-Once scheduling. With all-at-once scheduling, all the stages of a SQL/Job will be scheduled at almost the same time. The distributed DAG of the query is fixed at the beginning, so that the map tasks can streamingly push the shuffle data to downstream reduce tasks.

I will draft a detailed design doc to cover the proposed API changes later.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Additional context Add any other context or screenshots about the feature request here.

mingmwang commented 2 years ago

Add a design doc for further discussion.

https://docs.google.com/document/d/17J9H6gGBVktmRAFYNQu-v52QUUPlghRnVLIZC3mFYFY/edit?usp=sharing

xudong963 commented 2 years ago

Hi, @mingmwang , nice work!

You can open an RFC ticket if you are willing to, then we can discuss based on github. After we get the final version, we can merge it and save the RFC in our codebase. FYI, https://github.com/apache/arrow-datafusion/tree/master/docs/source/specification/rfcs

mingmwang commented 2 years ago

Hi, @mingmwang , nice work!

You can open an RFC ticket if you are willing to, then we can discuss based on github. After we get the final version, we can merge it and save the RFC in our codebase. FYI, https://github.com/apache/arrow-datafusion/tree/master/docs/source/specification/rfcs

Can we discuss in the google doc or in this thread directly ? Everyone can comment on the google doc. I will open a PR to cover all the related code changes so that everyone who interest can take a look and give me advice.

xudong963 commented 2 years ago

Can we discuss in the google doc or in this thread directly ? Everyone can comment on the google doc.

Both are ok, just a suggestion.

I will open a PR to cover all the related code changes so that everyone who interest can take a look and give me advice.

👍

thinkharderdev commented 2 years ago

This sounds great!

houqp commented 2 years ago

The design looks good to me, thanks for writing it up @mingmwang , left a minor question in the doc.

EricJoy2048 commented 2 years ago

It's great!

heroWang commented 11 months ago

Any progress on this new feature?

mingmwang commented 11 months ago

@heroWang I will start work on this maybe next month. Recently I'm busy with other stuff and do not get any bandwidth on DataFusion and Ballista