apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.27k stars 1.18k forks source link

Document "how to read an explain plan" #12088

Closed alamb closed 2 months ago

alamb commented 2 months ago

Is your feature request related to a problem or challenge?

The EXPLAIN command is a key way to understand what DataFusion has done. However, the output format is not well documented which makes understanding what DataFusion is doing harder

The syntax of EXPLAIN is documented here: https://datafusion.apache.org/user-guide/sql/explain.html

However information on how to interpret what these mean is not available

Describe the solution you'd like

I would like to document how to read an explain plan

Describe alternatives you've considered

I propose adding a new page in the user guide here:

Screenshot 2024-08-20 at 3 22 43 PM

InfluxData has internal documentation about how to read an explain plan that we would like to donate to the public docs (both to help others as well as to have help maintaining them)

Additional context

No response

alamb commented 2 months ago

To adapt this for the DataFusion documentation, I recommend using the clickbench dataset and updating the examples.

To get the clickbench dataset in datafusion/benchmarks/data/hits.parquet , from the root of the datafusion repo do:

cd benchmarks
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2/benchmarks$ ./bench.sh data clickbench_1
***************************
DataFusion Benchmark Runner and Data Generator
COMMAND: data
BENCHMARK: clickbench_1
DATA_DIR: /Users/andrewlamb/Software/datafusion2/benchmarks/data
CARGO_COMMAND: cargo run --release
PREFER_HASH_JOIN: true
***************************
Checking hits.parquet...... found 14779976446 bytes ... Done

Then you can run datafusion-cli to get plans:

cd datafusion/benchmarks/data
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2/benchmarks/data$ datafusion-cli
DataFusion CLI v41.0.0
> select count(*) from 'hits.parquet';
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 0.062 seconds.

You can now run explain queries against this file like

> explain select count(*) from 'hits.parquet';
+---------------+---------------------------------------------------------------+
| plan_type     | plan                                                          |
+---------------+---------------------------------------------------------------+
| logical_plan  | Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] |
|               |   TableScan: hits.parquet projection=[]                       |
| physical_plan | ProjectionExec: expr=[99997497 as count(*)]                   |
|               |   PlaceholderRowExec                                          |
|               |                                                               |
+---------------+---------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.053 seconds.
alamb commented 2 months ago

Here is a subset of our internal content (thanks @NGA-TRAN who originally wrote this)

Introduction

A SQL/InfluxQL query in InfluxDB 3.0 is executed based on a query plan. To see the plan without running the query, add the keyword EXPLAIN

EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;

The output will look like

+---------------+--------------------------------------------------------------------------+
| plan_type     | plan                                                                     |
+---------------+--------------------------------------------------------------------------+
| logical_plan  | Sort: h2o.city ASC NULLS LAST, h2o.time DESC NULLS FIRST                 |
|               |   TableScan: h2o projection=[city, min_temp, time]                       |
| physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST,time@2 DESC]             |
|               |   UnionExec                                                              |
|               |     SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC]                   |
|               |       ParquetExec: file_groups={...}, projection=[city, min_temp, time]  |
|               |     SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC]                   |
|               |       ParquetExec: file_groups={...}, projection=[city, min_temp, time]  |
|               |                                                                          |
+---------------+--------------------------------------------------------------------------+

Figure 1: A simplified output of an explain

There are two major plans: logical plan and physical plan

Understanding a query plan can help to explain why your query is slow. For example, when the plan shows your query reads many files, it signals you to either add more filter in the query to read less data or to modify your cluster configuration/design to make fewer but larger files. This document focuses on how to read a query plan. How to make a query run faster depends on the reason it is slow and beyond the scope of this document.

How to read a query plan

Query plan is a tree

A query plan is an upside down tree and we always read from bottom up. The physical plan in Figure 1 in tree format will look like

                   ┌─────────────────────────┐                  
                   │ SortPreservingMergeExec │                  
                   └─────────────────────────┘                  
                                ▲                               
                                │                               
                   ┌─────────────────────────┐                  
                   │        UnionExec        │                  
                   └─────────────────────────┘                  
                                ▲                               
             ┌──────────────────┴─────────────────┐             
             │                                    │             
┌─────────────────────────┐          ┌─────────────────────────┐
│        SortExec         │          │        SortExec         │
└─────────────────────────┘          └─────────────────────────┘
             ▲                                    ▲             
             │                                    │             
             │                                    │             
┌─────────────────────────┐          ┌─────────────────────────┐
│       ParquetExec       │          │       ParquetExec       │
└─────────────────────────┘          └─────────────────────────┘

Figure 2: The tree structure of physical plan in Figure 1

Each node in the tree/plan ends with Exec and is sometimes also called an operator or ExecutionPlan where data is processed, transformed and sent up.

First, data in parquet files are read in parallel through the two ParquetExec, which each outputs a stream of data to its corresponding SortExec. The SortExc is responsible for sorting the data in city ascendingly and time descendingly. The sorted outputs from the two SortExec are then unioned by the UnionExec which is then (sort) merged by the SortPreservingMergeExec to return the sorted data.

How to understand a large query plan

A large query plan may look intimidating but if you follow these steps, you can quickly understand what the plan does.

  1. As always, read from bottom up, one operator at a time.
  2. Understand the job of this operator which mostly can be found from DataFusion Physical Plan documentation.
  3. What the input data of the operator looks like and how large/small it may be.
  4. How much data that operator may send out and what it would look like.

If you can answer those questions, you will be able to estimate how much work that plan has to do. However, the explain just shows you the plan without executing it. If you want to know exactly how long a plan and each of its operators take, you need other tools.

Tools that show the exact runtime for each operator

(INTERNAL STUFF FROM IOX)

  1. Run explain analyze to get the explain with runtime added
More information for debugging

If the plan has to read too many files, not all of them will be shown in the explain. To see them, use explain verbose. Like explain, explain verbose does not run the query and thus you won't get runtime. What you get is all information that is cut off from the explain and all intermidiate physical plans IOx querier and DataFusion generate before returning the final physical plan. This is very helpful for debugging to see when an operator is added to or removed from a plan.

Example of typical plan of leading edge data

Let us delve into an example that covers typical operators as well as IOx specific ones on leadng edge data.

Data Organization

(EXPLAIN clickbench file here)

Query and query plan

TODO change this example

EXPLAIN SELECT city, count(1) FROM   h2o WHERE  time >= to_timestamp(200) AND time < to_timestamp(700) AND state = 'MA' GROUP BY city ORDER BY city ASC;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: h2o.city ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |   Aggregate: groupBy=[[h2o.city]], aggr=[[COUNT(Int64(1))]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |     TableScan: h2o projection=[city], full_filters=[h2o.time >= TimestampNanosecond(200, None), h2o.time < TimestampNanosecond(700, None), h2o.state = Dictionary(Int32, Utf8("MA"))]                                                                                                                                                                                                                                                                                                                                                                                                                   |
| physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |   SortExec: expr=[city@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |     AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], aggr=[COUNT(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |         RepartitionExec: partitioning=Hash([city@0], 4), input_partitions=4                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |           AggregateExec: mode=Partial, gby=[city@0 as city], aggr=[COUNT(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |             RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |               UnionExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |                 ProjectionExec: expr=[city@0 as city]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |                     FilterExec: time@2 >= 200 AND time@2 < 700 AND state@1 = MA                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |                       ParquetExec: file_groups={2 groups: [[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/243db601-f3f1-401b-afda-82160d8cc1a8.parquet], [1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/f5fb7c7d-16ac-49ba-a811-69578d05843f.parquet]]}, projection=[city, state, time], output_ordering=[state@1 ASC, city@0 ASC, time@2 ASC], predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA, pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3                                           |
|               |                 ProjectionExec: expr=[city@1 as city]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |                   DeduplicateExec: [state@2 ASC,city@1 ASC,time@3 ASC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |                     SortPreservingMergeExec: [state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |                       UnionExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |                         SortExec: expr=[state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |                             FilterExec: time@3 >= 200 AND time@3 < 700 AND state@2 = MA                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |                               RecordBatchesExec: chunks=1, projection=[__chunk_order, city, state, time]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |                           FilterExec: time@3 >= 200 AND time@3 < 700 AND state@2 = MA                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |                             ParquetExec: file_groups={2 groups: [[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/2cbb3992-4607-494d-82e4-66c480123189.parquet], [1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/9255eb7f-2b51-427b-9c9b-926199c85bdf.parquet]]}, projection=[__chunk_order, city, state, time], output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC], predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA, pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3 |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Figure 3: A typical query plan of leading edge (most recent) data

Let us begin reading bottom up. The bottom or leaf nodes are always either ParquetExec or RecordBatchExec. There are 3 of them in this plan and let us go over one by one.

Three bottom leaf operators: two ParquetExec & one RecordBatchesExec

First ParqetExec

|               |                             ParquetExec: file_groups={2 groups: [[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/2cbb3992-4607-494d-82e4-66c480123189.parquet], [1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/9255eb7f-2b51-427b-9c9b-926199c85bdf.parquet]]}, projection=[__chunk_order, city, state, time], output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC], predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA, pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3 |

Figure 4: First ParquetExec

RecordbatchesExec

|               |                               RecordBatchesExec: chunks=1, projection=[__chunk_order, city, state, time]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |

Figure 5: RecordBacthesExec

Data from ingester can be in many chunks, in this example there is only one. Like ParquetExec, only data of 4 columns are sent to the output. The action of filtering columns is called projection pushdown and, thus, it is named projection here.

Second ParquetExec

|               |                       ParquetExec: file_groups={2 groups: [[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/243db601-f3f1-401b-afda-82160d8cc1a8.parquet], [1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/f5fb7c7d-16ac-49ba-a811-69578d05843f.parquet]]}, projection=[city, state, time], output_ordering=[state@1 ASC, city@0 ASC, time@2 ASC], predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA, pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3                                           |

Figure 6: Second ParquetExec

The readings are similar to the one above. Note that these files and the ones in the first ParquetExec belong to the same (InfluxDB) partition (e.g. the same day)

Data-scanning structures

So the question is why we split parquet files into different ParquetExec while they are in the same partition? There are many reasons but two major ones are:

  1. Split the non-overlaps from the overlaps so we only need to apply the expensive deduplication operation on the overlaps. This is the case of this example.
  2. Split the non-overlaps to increase parallel execution
When we know there are ovelaps?
|               |                   DeduplicateExec: [state@2 ASC,city@1 ASC,time@3 ASC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |                     SortPreservingMergeExec: [state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |                       UnionExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |                         SortExec: expr=[state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |                             FilterExec: time@3 >= 200 AND time@3 < 700 AND state@2 = MA                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |                               RecordBatchesExec: chunks=1, projection=[__chunk_order, city, state, time]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |                           FilterExec: time@3 >= 200 AND time@3 < 700 AND state@2 = MA                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |                             ParquetExec: file_groups={2 groups: [[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/2cbb3992-4607-494d-82e4-66c480123189.parquet], [1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/9255eb7f-2b51-427b-9c9b-926199c85bdf.parquet]]}, projection=[__chunk_order, city, state, time], output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC], predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA, pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3 |

Figure 7: DeduplicationExec is a signal of overlapped data

This structure tells us that since there are DeduplicationExec, data underneath it overlaps. More specifically, data in 2 files overlaps or/and overlap with the data from the Ingesters.

When we know there are no overlaps?
|               |                 ProjectionExec: expr=[city@0 as city]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |                     FilterExec: time@2 >= 200 AND time@2 < 700 AND state@1 = MA                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |                       ParquetExec: file_groups={2 groups: [[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/243db601-f3f1-401b-afda-82160d8cc1a8.parquet], [1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/f5fb7c7d-16ac-49ba-a811-69578d05843f.parquet]]}, projection=[city, state, time], output_ordering=[state@1 ASC, city@0 ASC, time@2 ASC], predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA, pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3                                           |
|

Figure 8: No DeduplicateExec means files not overlap

Deduplicate is not in this structure and above it means the files here do not overlap

Other operators

Now let us look at the rest of the plan

| physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |   SortExec: expr=[city@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |     AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], aggr=[COUNT(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |         RepartitionExec: partitioning=Hash([city@0], 4), input_partitions=4                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |           AggregateExec: mode=Partial, gby=[city@0 as city], aggr=[COUNT(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |             RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |               UnionExec 

Figure 9: The rest of the plan structure

For your questions and references

If you see the plan reads a lof of files and does deduplication on all of them you may want ask: "do all of them overlap or not?" The asnwer is either yes or no depending on the situation. There are other reasons that we deduplicate non-overlap files due to memory limitation but they will be topics for future documentation.

devanbenz commented 2 months ago

take

devanbenz commented 2 months ago

Reading through it now @alamb will let you know if everything sounds good :)

2010YOUY01 commented 2 months ago

A logical plan is relatively easy to understand, physical plans are definitely hard to understand, because they include the execution detail for exchange-based parallelism

For example, a multi-stage aggregation's physical plan looks like

|               |     AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], aggr=[COUNT(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |         RepartitionExec: partitioning=Hash([city@0], 4), input_partitions=4                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |           AggregateExec: mode=Partial, gby=[city@0 as city], aggr=[COUNT(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |

It appears every executor node has one input and one output In reality, one executor can have multiple instances being spawned according to the partition number, and different executor runtime instance can have one input stream + multiple output stream, one input + one output, etc.

The tricky thing is the data flow in execution is an "expanded" physical plan tree, which should be visualized with some imagination, instead of being a 1-to-1 mapping of the physical plan

I would recommend first explain what is exchange based parallelism with a concrete example (looks like muti-staged parallel aggregation is a good one) Then specify the behavior of different executors (how they split/merge data stream)

jstirnaman commented 2 months ago

InfluxData has internal documentation about how to read an explain plan that we would like to donate to the public docs (both to help others as well as to have help maintaining them)

We have a public-facing version as well:

devanbenz commented 2 months ago

@alamb I'm currently working on porting the section regarding data organization over:

Example of typical plan of leading edge data

Let us delve into an example that covers typical operators on leading edge data.

Data Organization

(EXPLAIN clickbench file here)

Query and query plan

Can you elaborate on explaining the clickbench file? Should I just write a brief on how data is organized in the parquet file?

devanbenz commented 2 months ago

Also I'm wondering why on my hardware I'm not seeing a UnionExec for the following plan: EXPLAIN SELECT "hits.parquet"."OS" AS os, COUNT(1) FROM 'hits.parquet' WHERE to_timestamp("hits.parquet"."EventTime") >= to_timestamp(200) AND to_timestamp("hits.parquet"."EventTime") < to_timestamp(700) AND "hits.parquet"."RegionID" = 839 GROUP BY os ORDER BY os ASC;

I assume its just dependent on the hardware since its part of the physical planner. I'm seeing the following output for this:

+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: os ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |   Projection: hits.parquet.OS AS os, count(Int64(1))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |     Aggregate: groupBy=[[hits.parquet.OS]], aggr=[[count(Int64(1))]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |       Projection: hits.parquet.OS                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |         Filter: __common_expr_4 >= TimestampNanosecond(200000000000, None) AND __common_expr_4 < TimestampNanosecond(700000000000, None) AND hits.parquet.RegionID = Int32(839)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |           Projection: to_timestamp(hits.parquet.EventTime) AS __common_expr_4, hits.parquet.RegionID, hits.parquet.OS                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |             TableScan: hits.parquet projection=[EventTime, RegionID, OS], partial_filters=[to_timestamp(hits.parquet.EventTime) >= TimestampNanosecond(200000000000, None), to_timestamp(hits.parquet.EventTime) < TimestampNanosecond(700000000000, None), hits.parquet.RegionID = Int32(839)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| physical_plan | SortPreservingMergeExec: [os@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |   SortExec: expr=[os@0 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |     ProjectionExec: expr=[OS@0 as os, count(Int64(1))@1 as count(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |       AggregateExec: mode=FinalPartitioned, gby=[OS@0 as OS], aggr=[count(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|               |           RepartitionExec: partitioning=Hash([OS@0], 10), input_partitions=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|               |             AggregateExec: mode=Partial, gby=[OS@0 as OS], aggr=[count(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |               ProjectionExec: expr=[OS@2 as OS]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |                   FilterExec: __common_expr_4@0 >= 200000000000 AND __common_expr_4@0 < 700000000000 AND RegionID@1 = 839                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |                     ProjectionExec: expr=[to_timestamp(EventTime@0) as __common_expr_4, RegionID@1 as RegionID, OS@2 as OS]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |                       ParquetExec: file_groups={10 groups: [[Users/devan/Documents/OSS/datafusion/benchmarks/data/hits.parquet:0..1477997645], [Users/devan/Documents/OSS/datafusion/benchmarks/data/hits.parquet:1477997645..2955995290], [Users/devan/Documents/OSS/datafusion/benchmarks/data/hits.parquet:2955995290..4433992935], [Users/devan/Documents/OSS/datafusion/benchmarks/data/hits.parquet:4433992935..5911990580], [Users/devan/Documents/OSS/datafusion/benchmarks/data/hits.parquet:5911990580..7389988225], ...]}, projection=[EventTime, RegionID, OS], predicate=to_timestamp(EventTime@4) >= 200000000000 AND to_timestamp(EventTime@4) < 700000000000 AND RegionID@8 = 839, pruning_predicate=CASE WHEN RegionID_null_count@2 = RegionID_row_count@3 THEN false ELSE RegionID_min@0 <= 839 AND 839 <= RegionID_max@1 END, required_guarantees=[RegionID in (839)] |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Viewing the previous examples I see that when two aggregates are being used i.e. GROUP BY and ORDER BY there is generally a UnionExec step for the physical planner. Why would it be different on my hardware?

alamb commented 2 months ago

Can you elaborate on explaining the clickbench file? Should I just write a brief on how data is organized in the parquet file?

I meant putting in https://github.com/apache/datafusion/issues/12088#issuecomment-2305139528

I assume its just dependent on the hardware since its part of the physical planner. I'm seeing the following output for this:

It is depenent on your hardware, but also the plans for the clickbench queries with just datafusion are going to be different than they are for InfluxDB (as InfluxDB has its own specializations for the physical plan)

If you have a question about what part of the plan means, feel free to leave a TODO in the text and perhaps I can help fill it out as part of the PR

devanbenz commented 2 months ago

A logical plan is relatively easy to understand, physical plans are definitely hard to understand, because they include the execution detail for exchange-based parallelism

For example, a multi-stage aggregation's physical plan looks like

|               |     AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], aggr=[COUNT(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |         RepartitionExec: partitioning=Hash([city@0], 4), input_partitions=4                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |           AggregateExec: mode=Partial, gby=[city@0 as city], aggr=[COUNT(Int64(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |

It appears every executor node has one input and one output In reality, one executor can have multiple instances being spawned according to the partition number, and different executor runtime instance can have one input stream + multiple output stream, one input + one output, etc.

The tricky thing is the data flow in execution is an "expanded" physical plan tree, which should be visualized with some imagination, instead of being a 1-to-1 mapping of the physical plan

I would recommend first explain what is exchange based parallelism with a concrete example (looks like muti-staged parallel aggregation is a good one) Then specify the behavior of different executors (how they split/merge data stream)

@2010YOUY01 I'm personally not very familiar with exchange based parallelism. Could you point me in the direction of a good paper/resource on the topic. Assuming How Query Engines Work has a good section on it: https://howqueryengineswork.com/12-parallel-query.html

2010YOUY01 commented 2 months ago

@2010YOUY01 I'm personally not very familiar with exchange based parallelism. Could you point me in the direction of a good paper/resource on the topic. Assuming How Query Engines Work has a good section on it: https://howqueryengineswork.com/12-parallel-query.html

I think this doc has a really good explanation https://github.com/apache/datafusion/blob/932adabcc04faf65b8c8670b2385dc13a0f849f4/datafusion/expr-common/src/accumulator.rs#L151-L197 If run a query with target_partitions = 2, the dataflow will look like the figure in that doc