apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.12k stars 1.16k forks source link

Exact filter pushdown can be duplicated in some instances #1116

Open nevi-me opened 3 years ago

nevi-me commented 3 years ago

Describe the bug

If using a data source that supports exact filters, we can duplicate filters if not all filters can be pushed down to the source. This happens if we have multiple filters on the same column, but one or more of those filters cannot be pushed to source.

To Reproduce

Using https://github.com/TheDataEngine/datafusion-mongo-connector, and the below SQL query on the NYC dataset:

select 
count(*) as total_records,
VendorID as vid,
sum(cast(trip_distance as float)) as total_distance
from mongo_nyc
where 
    passenger_count > 3 and 
    cast(trip_distance as float) < 5.00 and
    fare_amount / (total_amount + 0.001) > 0.70 and
    total_amount < 20.0 and 
    passenger_count is not null and 
    -passenger_count < -2 and 
    VendorID in ('2', '4')
group by VendorID
order by vid
limit 100

I am able to pass down the following filters in the where clause:

I don't yet support pushing the below:

If the query includes the above unsupported filter, the other exact filters are duplicated.

Limit: 100
  Sort: #vid ASC NULLS FIRST
    Projection: #COUNT(UInt8(1)) AS total_records, #mongo_nyc.VendorID AS vid, #SUM(CAST(mongo_nyc.trip_distance AS Float64)) AS total_distance
      Aggregate: groupBy=[[#mongo_nyc.VendorID]], aggr=[[COUNT(UInt8(1)), SUM(CAST(#mongo_nyc.trip_distance AS Float64))]]
        Filter: #mongo_nyc.passenger_count > Int64(3) AND #mongo_nyc.passenger_count IS NOT NULL AND (- #mongo_nyc.passenger_count) < Int64(-2)
          TableScan: mongo_nyc projection=Some([0, 3, 4, 10, 16]), filters=[#mongo_nyc.passenger_count > Int64(3), CAST(#mongo_nyc.trip_distance AS Float64) < Float64(5), #mongo_nyc.fare_amount / #mongo_nyc.total_amount + Float64(0.001) > Float64(0.7), #mongo_nyc.total_amount < Float64(20), #mongo_nyc.passenger_count IS NOT NULL, #mongo_nyc.VendorID IN ([Utf8("2"), Utf8("4")])]

If the query excludes the above negative filter, all filters are pushed down to the source.

Limit: 100
  Sort: #vid ASC NULLS FIRST
    Projection: #COUNT(UInt8(1)) AS total_records, #mongo_nyc.VendorID AS vid, #SUM(CAST(mongo_nyc.trip_distance AS Float64)) AS total_distance
      Aggregate: groupBy=[[#mongo_nyc.VendorID]], aggr=[[COUNT(UInt8(1)), SUM(CAST(#mongo_nyc.trip_distance AS Float64))]]
        TableScan: mongo_nyc projection=Some([0, 3, 4, 10, 16]), filters=[#mongo_nyc.passenger_count > Int64(3), CAST(#mongo_nyc.trip_distance AS Float64) < Float64(5), #mongo_nyc.fare_amount / #mongo_nyc.total_amount + Float64(0.001) > Float64(0.7), #mongo_nyc.total_amount < Float64(20), #mongo_nyc.passenger_count IS NOT NULL, #mongo_nyc.VendorID IN ([Utf8("2"), Utf8("4")])]

The passenger_count filters that are pushed to the source, are also evaluated by datafusion.

Expected behavior

Given that the filters are AND, I would expect datafusion to only evaluate the negated condition, as the other conditions (not null, > 3) would be redundant.

Additional context

I'm aware that constant folding will simplify passenger_count > 3 and -passenger_count < -2 to:

but before then, we are performing a few redundant calculations because of the duplicated filters.

nevi-me commented 3 years ago

Another interesting thing that could be an optimisation, the CAST(mongo_nyc.trip_distance AS Float64) could be pushed to the source altogether, as it now gets evaluated by both the source and datafusion.

This would work very well with SQL sources.

houqp commented 2 years ago

Most likely because our current filter push down implementation determine which filter to preserve by accessed columns, not by whether the exact filter has been pushed down or not: https://github.com/apache/arrow-datafusion/blob/f24e45fc8ec035e9ec0f6b6a18bb97e5bc0f9a1c/datafusion/src/optimizer/filter_push_down.rs#L474

frett27 commented 1 year ago

found the pushdown, needs some improvments, here is the use case i faced: i have a table with some binary data and a column giving the type.

a first view filter the records of type "int" a second view based on the former one cast the binary and make some where clause filter to the records

in the explain plan, the where clause of the second view is pushed down, and fail because all records cannot be cast in "int"