apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.33k stars 1.2k forks source link

Avoid pushdown of volatile functions to tablescan #13475

Open theirix opened 3 days ago

theirix commented 3 days ago

Which issue does this PR close?

When using pushdown filters, the planner pushes the volatile random() filter to the table source, so it executes in scan (for example, in parquet) and in the query engine, which leads to weird results.

Closes #13268.

Rationale for this change

It's impossible to evaluate volatile filters in different layers.

What changes are included in this PR?

Are these changes tested?

As proposed in the original issue, I tried alltypes_tiny_pages_plain.parquet sample file containing 7300 lines:

set datafusion.execution.parquet.pushdown_filters=true;
create external table data stored as parquet location 'alltypes_tiny_pages_plain.parquet';

Running a query

select COUNT(*) from data WHERE RANDOM() < 0.1;

with datafusion-cli gives an answer of 726, which is pretty close to the expected 730.

New plan

+---------------+---------------------------------------------------------------------------------+
| plan_type     | plan                                                                            |
+---------------+---------------------------------------------------------------------------------+
| logical_plan  | Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]                   |
|               |   Filter: random() < Float64(0.1)                                               |
|               |     TableScan: data projection=[]                                               |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(*)]                              |
|               |   CoalescePartitionsExec                                                        |
|               |     AggregateExec: mode=Partial, gby=[], aggr=[count(*)]                        |
|               |       CoalesceBatchesExec: target_batch_size=8192                               |
|               |         FilterExec: random() < 0.1                                              |
|               |           RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1  |
|               |             ParquetExec: file_groups={1 group: [[sample.parquet]]}                            |
|               |                                                                                 |
+---------------+---------------------------------------------------------------------------------+

Before the change plan was

| ParquetExec: file_groups={1 group: [[alltypes_tiny_pages_plain.parquet]]}, predicate=random() < 0.1 |

Are there any user-facing changes?

No breaking changes.