Currently, spark sort compact will sort all the filtered data globally, and then write them by dynamic partition overwrite, which will lead to the following problems:
If the partition is not specified, data will be sorted globally, which is an unnecessary overhead
because it is a full range shuffle, small files are uncontrollable, for all reducers may contain all partitions
Therefore, this PR make a single partition as a sort compact group and then submit it to spark, a new conf ~max_order_threads~ max_concurrent_jobs is introduced in sort compact procedure, the maximum number of concurrent job submissions, default is 15.
Tests
Test on 1T tpc-ds, sort compact web_sales on ws_bill_customer_sk, before 233s -> after 108s, and the target file size can be controled by spark.sql.shuffle.partitions
Purpose
Currently, spark sort compact will sort all the filtered data globally, and then write them by dynamic partition overwrite, which will lead to the following problems:
Therefore, this PR make a single partition as a sort compact group and then submit it to spark, a new conf ~
max_order_threads
~max_concurrent_jobs
is introduced in sort compact procedure, the maximum number of concurrent job submissions, default is 15.Tests
Test on 1T tpc-ds, sort compact web_sales on ws_bill_customer_sk, before 233s -> after 108s, and the target file size can be controled by
spark.sql.shuffle.partitions
API and Format
Documentation