apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.4k stars 182 forks source link

Create empty shuffle read task #412

Open Ted-Jiang opened 1 year ago

Ted-Jiang commented 1 year ago

Describe the bug when i was implemted https://github.com/apache/arrow-ballista/pull/399, run tpch-1g query-1 (1 schduler + 1 executor) got log:

2022-10-20T11:03:47.533160Z  INFO tokio-runtime-worker ThreadId(12) ballista_executor::metrics: === [bMy1dcS/1/0] Physical plan with metrics ===
ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 16)), metrics=[output_rows=4, input_rows=4, write_time=2.060264ms, repart_time=319.891µs]
  AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))], metrics=[output_rows=4, elapsed_compute=27.081807417s, spill_count=0, spilled_bytes=0, mem_used=0]
    ProjectionExec: expr=[l_extendedprice@1 * 1 - l_discount@2 as lineitem.l_extendedprice * Float64(1) - lineitem.l_discountFloat64(1) - lineitem.l_discountlineitem.l_discountFloat64(1)lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus], metrics=[output_rows=5916590, elapsed_compute=962.699969ms, spill_count=0, spilled_bytes=0, mem_used=0]
      CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=5916590, elapsed_compute=13.815912316s, spill_count=0, spilled_bytes=0, mem_used=0]
        FilterExec: l_shipdate@6 <= 10471, metrics=[output_rows=5916590, elapsed_compute=1.208413079s, spill_count=0, spilled_bytes=0, mem_used=0]
          ParquetExec: limit=None, partitions=[Users/yangjiang/test-data/1g_tpch_pageIndex/lineitem/part-00000-7d2abab2-a301-4452-9f1d-c641e7f15af4-c000.snappy.parquet], predicate=l_shipdate_min@0 <= 10471, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], metrics=[output_rows=6001214, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, row_groups_pruned{filename=Users/yangjiang/test-data/1g_tpch_pageIndex/lineitem/part-00000-7d2abab2-a301-4452-9f1d-c641e7f15af4-c000.snappy.parquet}=0, num_predicate_creation_errors=0, predicate_evaluation_errors{filename=Users/yangjiang/test-data/1g_tpch_pageIndex/lineitem/part-00000-7d2abab2-a301-4452-9f1d-c641e7f15af4-c000.snappy.parquet}=0, bytes_scanned{filename=Users/yangjiang/test-data/1g_tpch_pageIndex/lineitem/part-00000-7d2abab2-a301-4452-9f1d-c641e7f15af4-c000.snappy.parquet}=56624557, time_elapsed_opening=3.161956ms, time_elapsed_scanning=53.84992ms, time_elapsed_processing=12.594492894s]

2022-10-20T11:03:47.533930Z  INFO tokio-runtime-worker ThreadId(12) ballista_executor::execution_loop: Done with task TID 0 bMy1dcS/1.0/0.0
2022-10-20T11:03:47.534107Z  INFO tokio-runtime-worker ThreadId(12) ballista_executor: Task 0 finished with operator_metrics array size 6
2022-10-20T11:03:47.691994Z  INFO tokio-runtime-worker ThreadId(03) ballista_executor::execution_loop: Received task TID 1 bMy1dcS/2.0/0.0
2022-10-20T11:03:47.693265Z  INFO tokio-runtime-worker ThreadId(12) ballista_core::execution_plans::shuffle_reader: ShuffleReaderExec::execute(TID 1 bMy1dcS/2.0/0.0)
2022-10-20T11:03:47.693403Z  INFO tokio-runtime-worker ThreadId(12) ballista_core::execution_plans::shuffle_reader: local shuffle file counts:0, remote shuffle file count:0.
2022-10-20T11:03:47.695086Z  INFO tokio-runtime-worker ThreadId(12) ballista_core::execution_plans::shuffle_writer: Executed partition 0 in 0 seconds. Statistics: numBatches=Some(1), numRows=Some(0), numBytes=Some(2032)

which

2022-10-20T11:03:47.693403Z  INFO tokio-runtime-worker ThreadId(12) ballista_core::execution_plans::shuffle_reader: local shuffle file counts:0, remote shuffle file count:0.

To Reproduce Steps to reproduce the behavior:

Expected behavior A clear and concise description of what you expected to happen.

Additional context Add any other context about the problem here.

Dandandan commented 1 year ago

Is the issue that we can remove tasks/partitions/stages that are empty or is it a bug?

Ted-Jiang commented 1 year ago

@Dandandan I think this is not a bug, like the situation if the data is skew, all data write to the shuffle read partiton-1. IMO we still should launch R executors. 😂