finos / morphir-elm

Tools to work with the Morphir IR in Elm.
https://package.elm-lang.org/packages/finos/morphir-elm/latest
Apache License 2.0
46 stars 65 forks source link

Support aggregation of multiple values in a single function #799

Open jonathanmaw opened 2 years ago

jonathanmaw commented 2 years ago

There are times when a query will aggregate multiple values at once, e.g.

source = [{cost=1, weight=2}, {cost=3, weight=4}]
testMultiple source =
    [{
        lightest =
            source |> List.map .weight |> List.minimum,
        dearest =
            source |> List.map .cost |> List.maximum
    }]

which we want to be translated into

source.select(
    org.apache.spark.sql.functions.min(org.apache.spark.sql.functions.col("weight")).alias("lightest"),
    org.apache.spark.sql.functions.max(org.apache.spark.sql.functions.col("cost")).alias("dearest"),
)

This work should involve:

jonathanmaw commented 2 years ago

@AttilaMihaly I've created an issue to track our need to handle aggregation in more general cases. I'm a little unclear of exactly what's required, so I'd appreciate if you could give it a look and make sure I understand this issue correctly.

jonathanmaw commented 2 years ago

Attila has created a Pull Request for a generic implementation of Aggregations (https://github.com/finos/morphir-elm/pull/800).

It recommends we support aggregations by providing a Morphir.SDK.Aggregate module that users can write aggregations in and Morphir can parse, instead of trying to interpret aggregations as they might be written in elm.

jonathanmaw commented 2 years ago

Using examples and documentation at https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.sql.Dataset, I think that while morphir's aggregation supports separate filters for each aggregation expression, Spark does not. Based on examples, I think filters must be applied in the function chain before the groupBy and aggregation.

I think it's fair for the Spark backend to not support per-aggregation-expression filters, with documentation making this clear and also making clear how filters can be done in the Spark backend.

jonathanmaw commented 2 years ago

I have found another limitation of Spark's aggregations compared to Morphir SDK's aggregations - giving the key an arbitrary name. i.e. in Morphir SDK aggregate, we do:

testDataSet =
        [ TestInput1 "k1_1" "k2_1" 1
        , TestInput1 "k1_1" "k2_1" 2
        , TestInput1 "k1_1" "k2_2" 3
        , TestInput1 "k1_1" "k2_2" 4
        , TestInput1 "k1_2" "k2_1" 5
        , TestInput1 "k1_2" "k2_1" 6
        , TestInput1 "k1_2" "k2_2" 7
        , TestInput1 "k1_2" "k2_2" 8
        ]

testDataSet
    |> groupBy .key1
    |> aggregate
        (\key inputs ->
            { key = key
            , count = inputs (count |> withFilter (\a -> a.value < 7))
            , sum = inputs (sumOf .value)
            , max = inputs (maximumOf .value)
            , min = inputs (minimumOf .value)
            }
        )

and get the results

[ { key = "k1_1", count = 4, sum = 10, max = 4, min = 1 }
, { key = "k1_2", count = 2, sum = 26, max = 8, min = 5 }
]

Specifically, key = key part of the aggregate expression creates a column named "key"

In Spark aggregate, a similar aggregation is:

val simpleData = Seq(("James","Sales","NY",90000,34,10000),
  ("Michael","Sales","NY",86000,56,20000),
  ("Robert","Sales","CA",81000,30,23000),
  ("Maria","Finance","CA",90000,24,23000),
  ("Raman","Finance","CA",99000,40,24000),
  ("Scott","Finance","NY",83000,36,19000),
  ("Jen","Finance","NY",79000,53,15000),
  ("Jeff","Marketing","CA",80000,25,18000),
  ("Kumar","Marketing","NY",91000,50,21000)
)
val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
df.groupBy("department")
    .agg(
      sum("salary").as("sum_salary"),
      avg("salary").as("avg_salary"),
      sum("bonus").as("sum_bonus"),
      max("bonus").as("max_bonus"))

which generates

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |171000    |85500.0          |39000    |21000    |
+----------+----------+-----------------+---------+---------+

i.e. the column used to group by, "department", is always given that name in the aggregation.

It's possible to rename the columns by following this up with a Select ObjectExpression, but it's not possible to omit that column (which is possible in Morphir SDK's aggregate), and I don't know if renaming that column is specifically required.

For now, I will be proceeding with the assumption that "key" is always used to create a column with the exact same name as the one grouped by, and the Spark backend will deliberately discard any fields that use "key", leaving that to Spark's default behaviour.

jonathanmaw commented 2 years ago

Following discussions, it was decided that:

AttilaMihaly commented 2 years ago

One correction to the first bullet-point: We want the Spark AST to represent Spark group-by and aggregation operations, not the Aggregation API in the Morphir SDK. The rest looks correct.

jonathanmaw commented 2 years ago

Pull Request #848 has been made for this issue.