apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
829 stars 164 forks source link

1TB TPCDS benchmark over Vanilla Spark, suffer performance slowdown #588

Closed DamonZhao-sfu closed 1 month ago

DamonZhao-sfu commented 5 months ago

What is the problem the feature request solves?

I'm running the 1TB TPCDS benchmark over Comet and Vanilla Spark. I'm running on a 48Core 186G RAM machine Here's my config:

/localhdd/hza214/spark-3.4/spark-3.4.2-bin-hadoop3/bin/spark-shell \
    --jars $COMET_JAR \
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.comet.batchSize=8192 \
    --conf spark.sql.autoBroadcastJoinThreshold=-1 \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true \
    --conf spark.comet.cast.allowIncompatible=true \
    --conf spark.comet.explainFallback.enabled=true\
    --conf spark.comet.parquet.io.enabled=false \
    --conf spark.comet.batchSize=8192 \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=50g \
    --conf spark.shuffle.file.buffer=128k\
    --conf spark.local.dir=/mnt/smartssd_0n/hza214/sparktmp \
    --num-executors 48 \
    --executor-cores 48 \
    --driver-memory 20g \
    --executor-memory 140g \

image

Has the open-source community run TPCDS benchmark before on Comet? I think the project is still in an relatively early stage, so many native operators are not supported yet. So does it meet the expectation?

Describe the potential solution

No response

Additional context

No response

viirya commented 5 months ago

Please follow the benchmark guide https://datafusion.apache.org/comet/contributor-guide/benchmarking.html to set up proper Comet configs. With your configs, most native operators wouldn't be enabled.

viirya commented 5 months ago

@andygrove created a repo https://github.com/apache/datafusion-benchmarks including scripts used to benchmark Comet. You can also follow the steps.

andygrove commented 5 months ago

Hi @DamonZhao-sfu we are working on some items in the 0.1.0 milestone that will likely help, particularly https://github.com/apache/datafusion-comet/pull/591 and https://github.com/apache/datafusion-comet/issues/387

I am not personally planning on spending much time on TPC-DS until we have some of these issues resolved.

DamonZhao-sfu commented 5 months ago

Hi @DamonZhao-sfu we are working on some items in the 0.1.0 milestone that will likely help, particularly #591 and #387

I am not personally planning on spending much time on TPC-DS until we have some of these issues resolved.

Thank you for your reply. I will try when these features are merged. When I follow https://datafusion.apache.org/comet/contributor-guide/benchmarking.html the benchmark guide and set the same configuration, I still discover many aggr/join operators are not supported natively, and they run much slower than vanilla spark. I'm currently writing a paper on benchmarking different spark native engine, it seems that the comet community is currently focusing on TPCH optimization? Will more native operator in TPCDS be supported in the future?

andygrove commented 5 months ago

Thank you for your reply. I will try when these features are merged. When I follow https://datafusion.apache.org/comet/contributor-guide/benchmarking.html the benchmark guide and set the same configuration, I still discover many aggr/join operators are not supported natively, and they run much slower than vanilla spark. I'm currently writing a paper on benchmarking different spark native engine, it seems that the comet community is currently focusing on TPCH optimization? Will more native operator in TPCDS be supported in the future?

Yes, we are aiming for full TPC-DS support. We are just starting with TPC-H because it is easier for contributors to get up and running with that benchmark and is good enough to highlight some current limitations.

andygrove commented 4 months ago

Hi @DamonZhao-sfu. For query 72, are you enabling CBO in Spark or using any form of join reordering or are you using the official version of the query that joins catalog_sales to inventory first? I am asking because your times for q72 (in both Spark and Comet) are faster than I am seeing locally.

andygrove commented 4 months ago

@DamonZhao-sfu could you also provide the configs you used for the Spark run? I am seeing most queries running faster with Comet (but at 100GB) and would like to try and reproduce your results.

tpch_queries_speedup

DamonZhao-sfu commented 4 months ago

Hi @DamonZhao-sfu. For query 72, are you enabling CBO in Spark or using any form of join reordering or are you using the official version of the query that joins catalog_sales to inventory first? I am asking because your times for q72 (in both Spark and Comet) are faster than I am seeing locally.

No, i did not enable CBO and join reorder. I'm using the official version. here's my sql:

select  i_item_desc
      ,w_warehouse_name
      ,d1.d_week_seq
      ,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
      ,sum(case when p_promo_sk is not null then 1 else 0 end) promo
      ,count(*) total_cnt
from catalog_sales
join inventory on (cs_item_sk = inv_item_sk)
join warehouse on (w_warehouse_sk=inv_warehouse_sk)
join item on (i_item_sk = cs_item_sk)
join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
join date_dim d2 on (inv_date_sk = d2.d_date_sk)
join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
left outer join promotion on (cs_promo_sk=p_promo_sk)
left outer join catalog_returns on (cr_item_sk = cs_item_sk and cr_order_number = cs_order_number)
where d1.d_week_seq = d2.d_week_seq
  and inv_quantity_on_hand < cs_quantity 
  and d3.d_date > d1.d_date + 5
  and hd_buy_potential = '1001-5000'
  and d1.d_year = 2001
  and cd_marital_status = 'M'
group by i_item_desc,w_warehouse_name,d1.d_week_seq
order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
 LIMIT 100 ;
DamonZhao-sfu commented 4 months ago

@DamonZhao-sfu could you also provide the configs you used for the Spark run? I am seeing most queries running faster with Comet (but at 100GB) and would like to try and reproduce your results.

tpch_queries_speedup

here's my config:

export COMET_JAR=/localhdd/hza214/datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar
export SPARK_LOCAL_DIRS=/mnt/smartssd_0n/hza214/sparktmp
INFLUXDB_ENDPOINT=`hostname`
cat tpcds_parquet.scala | /localhdd/hza214/spark-3.4/spark-3.4.2-bin-hadoop3/bin/spark-shell \
    --jars $COMET_JAR \
    --conf spark.comet.xxhash64.enabled=true\
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.comet.batchSize=8192 \
    --conf spark.sql.autoBroadcastJoinThreshold=-1\
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true \
    --conf spark.comet.parquet.io.enabled=false \
    --conf spark.comet.cast.allowIncompatible=true \
    --conf spark.comet.explainFallback.enabled=true\
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.sql.adaptive.coalescePartitions.enabled=false\
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager\
    --conf spark.comet.exec.shuffle.enabled=true\
    --conf spark.comet.exec.shuffle.mode=native\
    --conf spark.memory.offHeap.size=50g \
    --conf spark.shuffle.file.buffer=128k\
    --conf spark.local.dir=/mnt/smartssd_0n/hza214/sparktmp \
    --executor-cores 48 \
    --driver-memory 10g \
    --executor-memory 140g \

But later I was advised by others that I should set a lower executor core to let more executors running in parallel in one node. I'm using 4 node clusters each with 48 core, 196GB memory and ssd as localfile disk. I have not tested with 100GB sizes yet. Let me reproduce it. Also, would you like to share your spark and server configs? @andygrove

andygrove commented 4 months ago

Thanks @DamonZhao-sfu. We just updated our benchmarking guide with the currently recommended configs for the latest Comet code. Could you build with latest code and try with these settings?

Here are the results we are seeing for TPC-DS. They are not very impressive yet, but we do see a small speedup with Comet.

https://datafusion.apache.org/comet/contributor-guide/benchmarking.html#tpc-ds

DamonZhao-sfu commented 4 months ago

Hi @DamonZhao-sfu. For query 72, are you enabling CBO in Spark or using any form of join reordering or are you using the official version of the query that joins catalog_sales to inventory first? I am asking because your times for q72 (in both Spark and Comet) are faster than I am seeing locally.

No, i did not enable CBO and join reorder. I'm using the official version. here's my sql:

select  i_item_desc
      ,w_warehouse_name
      ,d1.d_week_seq
      ,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
      ,sum(case when p_promo_sk is not null then 1 else 0 end) promo
      ,count(*) total_cnt
from catalog_sales
join inventory on (cs_item_sk = inv_item_sk)
join warehouse on (w_warehouse_sk=inv_warehouse_sk)
join item on (i_item_sk = cs_item_sk)
join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
join date_dim d2 on (inv_date_sk = d2.d_date_sk)
join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
left outer join promotion on (cs_promo_sk=p_promo_sk)
left outer join catalog_returns on (cr_item_sk = cs_item_sk and cr_order_number = cs_order_number)
where d1.d_week_seq = d2.d_week_seq
  and inv_quantity_on_hand < cs_quantity 
  and d3.d_date > d1.d_date + 5
  and hd_buy_potential = '1001-5000'
  and d1.d_year = 2001
  and cd_marital_status = 'M'
group by i_item_desc,w_warehouse_name,d1.d_week_seq
order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
 LIMIT 100 ;

@andygrove could you provide the optimized join order version of q72? Thanks!

andygrove commented 3 months ago

@DamonZhao-sfu We just released Comet 0.2.0 which provides some performance improvements for TPC-DS. We are now starting to test with 1TB data set as well.

https://datafusion.apache.org/blog/2024/08/28/datafusion-comet-0.2.0/

andygrove commented 1 month ago

This issue is from before the 0.1.0 release and there have been some improvements to TPC-DS since then. We have an epic for improving TPC-DS so I think we can close this issue now.

https://github.com/apache/datafusion-comet/issues/858