apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.49k stars 1.02k forks source link

Convert `ArrayAgg` to UDAF #10999

Open jayzhan211 opened 2 weeks ago

jayzhan211 commented 2 weeks ago

Is your feature request related to a problem or challenge?

Similar to other issues in #8708

Remember to include test in roundtrip_expr_api

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

eejbyfeldt commented 2 weeks ago

Will work on this one. I think it might also involve moving nth_value since they shared some code.

eejbyfeldt commented 1 week ago

@jayzhan211 I pushed a work in progress here https://github.com/apache/datafusion/pull/11029 it still fails some test cases in sqllogictests

External error: query failed: DataFusion error: This feature is not implemented: Aggregate can not be used as a sliding accumulator because `retract_batch` is not implemented: NTH_VALUE(aggregate_test_100.c4,Int64(3)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING
[SQL] SELECT
   NTH_VALUE(c4, 3) OVER(ORDER BY c9 ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as nth_value1,
   NTH_VALUE(c4, 2) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as nth_value2
   FROM aggregate_test_100
   ORDER BY c9
   LIMIT 5
at test_files/window.slt:1205

External error: query failed: DataFusion error: External error: Arrow error: Invalid argument error: column types must match schema types, expected List(Field { name: "item", data_type: Struct([Field { name: "sn@1", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) but found List(Field { name: "item", data_type: Struct([Field { name: "sn@0", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) at column index 2
[SQL] SELECT ARRAY_AGG(e.rate ORDER BY e.sn)
FROM sales_global AS s
JOIN exchange_rates AS e
ON s.currency = e.currency_from AND
   e.currency_to = 'USD' AND
   s.ts >= e.ts
GROUP BY s.sn
ORDER BY s.sn;
at test_files/group_by.slt:3181

The first one seems to be because the new NTH_VALUE UDAF is picked over the builtin window function with the same name. Is this expected? What is the correct course of action to resolve it?

The second one looks a bit weird to me, not sure if I messed something up or I hitting some other issue.

@jayzhan211 If you have time to provide some pointers that would be highly appreciated :)

jayzhan211 commented 1 week ago

I suggest we convert 1. ArrayAgg 2 DistinctArrayAgg, 3. OrderSensitiveArrayAgg and 3. NthValue separately.

OrderSensitiveArrayAgg and NthValue are quite complex.

jayzhan211 commented 1 week ago

ArrayAgg and Nth expect to have parameter nullable, we need to add it to AggregateFunctionExpr so we can add it to StateFieldsArgs for state_fields and change the nullable for field.

We can get nullable with

https://github.com/apache/datafusion/blob/58d23c5c050f43aa7b867d4f0be7298d8d6cad83/datafusion/physical-expr/src/aggregate/build_in.rs#L71

https://github.com/apache/datafusion/blob/58d23c5c050f43aa7b867d4f0be7298d8d6cad83/datafusion/physical-expr-common/src/aggregate/mod.rs#L275C1-L289C6

eejbyfeldt commented 1 week ago

I suggest we convert ArrayAgg, DistinctArrayAgg, OrderSensitiveArrayAgg and NthValue separately.

At least leave OrderSensitiveArrayAgg and NthValue in another PR, since they have ordering and window function so it is quite complex

Sounds good. If only converting ArrayAgg how do one handle that there are multiple expressions using the same name? Should it not be registered? Or should I just leave the existing ArrayAgg code as is? The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state?

eejbyfeldt commented 1 week ago

ArrayAgg and Nth expect to have parameter nullable, we need to add it to AggregateFunctionExpr so we can add it to StateFieldsArgs for state_fields and change the nullable for field.

Make sense. How come we only provide a single value for input_type (https://github.com/apache/datafusion/blob/39.0.0/datafusion/physical-expr-common/src/aggregate/mod.rs#L245) can aggregates not have multiple inputs? Should the nullable field be just input_nullable: bool or should it be inputs_nullable: Vec<bool>?

jayzhan211 commented 1 week ago

ArrayAgg and Nth expect to have parameter nullable, we need to add it to AggregateFunctionExpr so we can add it to StateFieldsArgs for state_fields and change the nullable for field.

Make sense. How come we only provide a single value for input_type (https://github.com/apache/datafusion/blob/39.0.0/datafusion/physical-expr-common/src/aggregate/mod.rs#L245) can aggregates not have multiple inputs? Should the nullable field be just input_nullable: bool or should it be inputs_nullable: Vec<bool>?

We have single input because we have not meet any function that need multiple input yet. If there is any function that expect multiple input, we can extend it to Vec

jayzhan211 commented 1 week ago

I suggest we convert ArrayAgg, DistinctArrayAgg, OrderSensitiveArrayAgg and NthValue separately. At least leave OrderSensitiveArrayAgg and NthValue in another PR, since they have ordering and window function so it is quite complex

Sounds good. If only converting ArrayAgg how do one handle that there are multiple expressions using the same name? Should it not be registered? Or should I just leave the existing ArrayAgg code as is? The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state?

We could check the distinct and ordering to know whether we should use builtin or UDAF here

https://github.com/apache/datafusion/blob/18042fd69138e19613844580408a71a200ea6caa/datafusion/core/src/physical_planner.rs#L1825-L1909

The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state? We can move to physical_expr_common first, after all the related function is done, then move it back.

eejbyfeldt commented 1 week ago

We have single input because we have not meet any function that need multiple input yet. If there is any function that expect multiple input, we can extend it to Vec

What about covariance: https://github.com/apache/datafusion/blob/main/datafusion/functions-aggregate/src/covariance.rs#L43 that takes 2 arguments.

eejbyfeldt commented 1 week ago

@jayzhan211 Created a PR for only doning ArrayAgg here https://github.com/apache/datafusion/pull/11045 will look into adding nullable next.