apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.48k stars 1.01k forks source link

Projection pushdown doesn't work for User Defined plan Nodes #9146

Open alamb opened 4 months ago

alamb commented 4 months ago

Describe the bug

Reported in DiscordL https://discord.com/channels/885562378132000778/1166447479609376850/1204466794165706802

My node requests only one column (I defined it as an expression as it stated in UserDefinedLogicalNodeCore), but parquet reader scans all the cols.

The expected behavior is:

MyNode(col1)
 Parquet(col1)

However the actual behavior is

MyNode(col1)
 Parquet(col1,col2)

Projection Pushdown works with predefined nodes like filter. But not with my custom node

To Reproduce

No response

Expected behavior

No response

Additional context

This came from discord forums: https://discord.com/channels/885562378132000778/1166447479609376850/1204466794165706802

TableScan: ?table? projection=[project_id, user_id, created_at, event_id, event, str_0, str_1, str_2, str_3, str_4, str_5, str_6, str_7, str_8, str_9, str_10, str_11, str_12, str_13, str_14, str_15, str_16, str_17, str_18, str_19, str_20, str_21, str_22, str_23, str_24, ts_0, i_8, i_16, i_32, i_64, ts, bool, bool_nullable, string, decimal, group, v, string_dict]
 Sort: date_trunc(Utf8("day"), created_at) AS created_at ASC NULLS LAST
   PartitionedAggregatePartial: , agg: Count { filter: None, groups: Some([(Alias(Alias { expr: ScalarFunction(ScalarFunction { func_def: BuiltIn(DateTrunc), args: [Literal(Utf8("day")), Column(Column { relation: None, name: "created_at" })] }), relation: None, name: "created_at" }), SortField { data_type: Timestamp(Nanosecond, None) })]), predicate: Column { relation: None, name: "event" }, partition_col: Column { relation: None, name: "user_id" }, distinct: false } as "0_0"
     Filter: project_id = Int64(1) AND created_at >= TimestampNanosecond(1706966073340870000, None) AND created_at <= TimestampNanosecond(1707225273340870000, None) AND event = UInt16(6)
       Projection: project_id, user_id, created_at, event
         TableScan: ?table? projection=[project_id, user_id, created_at, event_id, event]

Physical Plan


SortPreservingMergeExec: [date_trunc(day, created_at@1) ASC NULLS LAST], metrics=[]
  SortExec: expr=[date_trunc(day, created_at@1) ASC NULLS LAST], metrics=[]
    SegmentedAggregatePartialExec, metrics=[]
      SortExec: expr=[project_id@0 ASC NULLS LAST,user_id@1 ASC NULLS LAST], metrics=[]
        CoalesceBatchesExec: target_batch_size=8192, metrics=[]
          RepartitionExec: partitioning=Hash([project_id@0, user_id@1], 12), input_partitions=12, metrics=[]
            ProjectionExec: expr=[project_id@0 as project_id, user_id@1 as user_id, created_at@2 as created_at, event@4 as event], metrics=[]
              CoalesceBatchesExec: target_batch_size=8192, metrics=[]
                FilterExec: project_id@0 = 1 AND created_at@2 >= 1706966073340870000 AND created_at@2 <= 1707225273340870000 AND event@4 = 6, metrics=[]
                  RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[]
                    ParquetExec: file_groups={1 group: [[Users/maximbogdanov/user_files/store/tables/events/0/0.parquet]]}, projection=[project_id, user_id, created_at, event_id, event], output_orderings=[[project_id@0 DESC NULLS LAST], [user_id@1 DESC NULLS LAST]], predicate=project_id@0 = 1 AND created_at@2 >= 1706966073340870000 AND created_at@2 <= 1707225273340870000 AND event@4 = 6, pruning_predicate=project_id_min@0 <= 1 AND 1 <= project_id_max@1 AND created_at_max@2 >= 1706966073340870000 AND created_at_min@3 <= 1707225273340870000 AND event_min@4 <= 6 AND 6 <= event_max@5, metrics=[num_predicate_creation_errors=0]
alamb commented 4 months ago

@mustafasrepo noted that

currently optimize_projections rule doesn't have support for user defined nodes. When plan encounters a user defined node, currently we do not prune any field below it. However, I think we should have this support as demonstrated by your use case.

I believe this is one usecase that @berkaysynnada is planning to address via https://github.com/apache/arrow-datafusion/issues/9111

berkaysynnada commented 4 months ago

For sure. Our ultimate goal is to ensure that custom plans benefit from this optimization by implementing the necessary methods.