Closed mbasmanova closed 1 year ago
Common usage is to compute a product of all values:
reduce_agg(x, 1, (a, b) -> (a * b), (a, b) -> (a * b))
Some other examples of usage:
reduce_agg(x, array[], (a, b) -> slice(reverse(array_sort(array_distinct(concat(a, b)))), 1, 1000), (a, b) -> slice(reverse(array_sort(array_distinct(concat(a, b)))), 1, 1000))
A clever way to compute a product of positive integers: https://stackoverflow.com/questions/49763553/multiplication-aggregate-operator-in-presto-and-aws-athena
PR #6482 adds reduce_agg function to Velox. Here are some comments about the implementation.
reduce_agg aggregate function doesn't lend itself well to vectorized execution as it is effectively a data-dependent loop. It is hard to avoid evaluating lambda expressions on small number of rows at a time. This particular implementation doesn't try very hard and simply evaluates lambda expressions one row at a time. A relatively straightforward improvement could be similar to the algorithm implemented in the 'reduce' scalar lambda function.
Consider dataset {1, 10, 2, 20, 3} where values 1, 2, 3 belong to group 1 and values 10, 20 belong to group 2.
This implementation goes like so:
s0 - initialValue, s1 - state for group 1, s2 - state for group 2.
s1 = s0
s2 = s0
s1 = f(s1, 1)
s2 = f(s2, 10)
s1 = f(s1, 2)
s2 = f(s2, 20)
s1 = f(s1, 3)
The inputFunction lambda expression is evaluated 5 times, once per row.
A more efficient approach would be to evaluate a set of rows that contain one row per group.
s1 = s0
s2 = s0
[s1, s2] = f([s1, s2], [1, 10])
[s1, s2] = f([s1, s2], [2, 20])
s1 = f(s1, 3)
Here, inputFunction lambda expression is evaluated only 3 times (compared to 5 times above).
Global aggregation would go slightly differently.
Consider dataset {1, 2, 3, 4, 5, 6,..100}.
This implementation goes like so:
s0 - initialValue, s - state of the only group.
s = s0
s = f(s, 1)
s = f(s, 2)
s = f(s, 3)
s = f(s, 4)
s = f(s, 5)
...
The inputFunction lambda expression is evaluated 100 times, once per row.
A more efficient approach would be:
f - inputFunction, g - combineFunction
Convert all inputs into states:
[s1, s2, s3, s4, s5,..s100] = f([s0, s0, s0, s0, s0,..], [1, 2, 3, 4, 5,..100])
Combine 100 states into 50:
[s1, s2,...s50] = g([s1, s3,...s99], [s2, s4,...s100])
Combine these 50 states into 25 states:
[s1, s2,...s25] = g([s1, s3,...s49], [s2, s4,...s50])
Continue in this manner until all states are combined. This requires only log2(100), ~7, expression evaluations.
Note that the more efficient algorithm for global aggregation doesn't support applying reduce_agg to sorted inputs.
Also, note that the more efficient algorithm for global aggregation can be used in a group-by as well in case there are few groups with lots of values in each.
A common use case for reduce_agg is to compute a product of input values.
reduce_agg(x, 1, (a, b) -> (a b), (a, b) -> (a b))
In this case, the best option is to identify this pattern and invoke specialized 'product(x)' aggregate function instead.
PR #6536 optimized the implementation of reduce_agg.
Blog post: https://velox-lib.io/blog/reduce-agg
Description
https://prestodb.io/docs/current/functions/aggregate.html#reduce_agg
The last item is weird. It seems to be an oversight in the Java implementation. In Velox, we are going to throw if inputFunction or combineFunction returns NULL.
reduce_agg in Presto doesn't support de-duplicating inputs, but there is no reason to have this limitation in Velox.
reduce_agg in Presto is defined as NOT order sensitive, e.g. it cannot be applied to sorted inputs. ORDER BY clause in reduce_agg(... ORDER BY ...) is ignored.
CC: @amitkdutta