apache / kyuubi

Apache Kyuubi is a distributed and multi-tenant gateway to provide serverless SQL on data warehouses and lakehouses.
https://kyuubi.apache.org/
Apache License 2.0
2.11k stars 916 forks source link

[Bug] KyuubiSparkSQLExtension will cause shuffle write data size expand too much #5236

Open lordk911 opened 1 year ago

lordk911 commented 1 year ago

Code of Conduct

Search before asking

Describe the bug

1、kyuubi1.7.1 、kyuubi-extension-spark-3-3_2.12-1.7.1.jar (download from maven)、spark 3.3.3 2、insert into ... select ... statment 3、if I user AQE and KyuubiSparkSQLExtension, the Exchange node is RoundRobinPartitioning, and shuffle write data size expand very large, I finally cancel the query

image

4、if I not use KyuubiSparkSQLExtension

image

5、I'm I get the wrong way to use KyuubiSparkSQLExtension?

Affects Version(s)

1.7.1

Kyuubi Server Log Output

No response

Kyuubi Engine Log Output

No response

Kyuubi Server Configurations

No response

Kyuubi Engine Configurations

##AQE Adaptive Query Execution
spark.sql.adaptive.enabled true
spark.sql.adaptive.forceApply false
spark.sql.adaptive.logLevel  info
spark.sql.adaptive.advisoryPartitionSizeInBytes 256m
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.coalescePartitions.minPartitionNum 1
spark.sql.adaptive.coalescePartitions.initialPartitionNum 2000
spark.sql.adaptive.fetchShuffleBlocksInBatch true
spark.sql.adaptive.localShuffleReader.enabled true
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 400MB
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.2
spark.sql.adaptive.autoBroadcastJoinThreshold 100MB
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 400MB
spark.sql.autoBroadcastJoinThreshold -1

#spark.sql.extensions            org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.kyuubi.sql.KyuubiSparkSQLExtension
spark.sql.extensions            org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Additional context

No response

Are you willing to submit PR?

lordk911 commented 1 year ago

The target table is not a partion table, And datasource with multi table join and sub query.

pan3793 commented 1 year ago

@ulysses-you

ulysses-you commented 1 year ago

set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false

lordk911 commented 1 year ago

set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false

I think this is same as I don't config KyuubiSparkSQLExtension to spark-default.conf, The core purpose is to transparently merge small files. However, KyuubiSparkSQLExtension may not be suitable for certain scenarios.

ulysses-you commented 1 year ago

@lordk911 how about adding set spark.sql.optimizer.inferRebalanceAndSortOrders.enabled=true;

ulysses-you commented 1 year ago

set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false

have you reverted this config ?

lordk911 commented 1 year ago

@ulysses-you 1、I've change spark-default.conf to : spark.sql.extensions org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.kyuubi.sql.KyuubiSparkSQLExtension 2、then connect to kyuubi : 2.1) set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true; 2.2)execute test sql : insert into select 2.3) before the InsertIntoHadoopFsRelationCommand there will be a Exchange node with RoundRobinPartitioning image 2.4)about 20 minutes later I cancel the query , because the shuffle write data size become larger than when I use spark3.2.3 with KyuubiSparkSQLExtension 2.5) set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false; 2.6) execute test sql : insert into select again 2.7) sql finished with the same output datasize and file number as direct use of spark without KyuubiSparkSQLExtension.

ulysses-you commented 1 year ago

@lordk911 how about adding set spark.sql.optimizer.inferRebalanceAndSortOrders.enabled=true;

have you tried this ?

lordk911 commented 1 year ago

image

I've tried this before, no help.

lordk911 commented 1 year ago
CREATE TABLE test.items
USING parquet
AS
SELECT id AS i_item_id,
CAST(rand() * 1000 AS INT) AS i_price
FROM RANGE(30000000);

CREATE TABLE test.sales
USING parquet
AS
SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() * 30000000 AS INT) END AS s_item_id,
CAST(rand() * 100 AS INT) AS s_quantity,
DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS s_date
FROM RANGE(1000000000);

create table IF NOT EXISTS test.aqe_kyuubi_extendtion(
  s_date   string,
  total_sales long
)STORED AS parquet;

set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

truncate table test.aqe_kyuubi_extendtion;

insert into test.aqe_kyuubi_extendtion 
SELECT s_date, s_quantity * i_price AS total_sales
FROM test.sales
JOIN test.items ON s_item_id = i_item_id;

image

set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false;

truncate table test.aqe_kyuubi_extendtion;

insert into test.aqe_kyuubi_extendtion 
SELECT s_date, s_quantity * i_price AS total_sales
FROM test.sales
JOIN test.items ON s_item_id = i_item_id;

image

will this help? @ulysses-you also test with set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true; and set spark.sql.optimizer.inferRebalanceAndSortOrders.enabled=true; got the same result as just set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

ulysses-you commented 1 year ago

@lordk911 I'm not sure your data view looks like. Maybe you can try Z-order

lordk911 commented 1 year ago

CREATE TABLE test.items USING parquet AS SELECT id AS i_item_id, CAST(rand() * 1000 AS INT) AS i_price FROM RANGE(30000000);

CREATE TABLE test.sales USING parquet AS SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() 30000000 AS INT) END AS s_item_id, CAST(rand() 100 AS INT) AS s_quantity, DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS s_date FROM RANGE(1000000000);

create table IF NOT EXISTS test.aqe_kyuubi_extendtion( s_date string, total_sales long )STORED AS parquet;

set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

truncate table test.aqe_kyuubi_extendtion;

insert into test.aqe_kyuubi_extendtion SELECT s_date, s_quantity * i_price AS total_sales FROM test.sales JOIN test.items ON s_item_id = i_item_id;

I think I've give out the example data above, When using KyuubiSparkSQLExtension, the number of output files is 69, with a total size of 2.7GB; without using KyuubiSparkSQLExtension, the number of output files is 49, with a total size of 2.5GB.