typelevel / frameless

Expressive types for Spark.
Apache License 2.0
879 stars 138 forks source link

Support for UDAF #88

Open imarios opened 7 years ago

imarios commented 7 years ago

Currently we have support for UDF only. UDAF are really useful and it will be nice if we can support them.

Relevant scala docs and this. The second seems to be the preferred way to do this with Datasets.

The Aggregator:

trait Aggregator[IN,BUF,OUT]

Example from SO:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean)  extends Aggregator[I, Boolean, Boolean]
    with Serializable {
  val zero = false
  def reduce(acc: Boolean, x: I) = acc | f(x)
  def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
  def finish(acc: Boolean) = acc

  def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
  def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)

Examples from spark

Other example from Databricks:

val simpleSum = new Aggregator[Int, Int, Int] with Serializable {
  def zero: Int = 0                     // The initial value.
  def reduce(b: Int, a: Int) = b + a    // Add an element to the running total
  def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
  def finish(b: Int) = b                // Return the final result.
}.toColumn

val ds = Seq(1, 2, 3, 4).toDS()
ds.select(simpleSum).collect

todo: Create an TypedAggregator that returns a TypedColumn.

The entire construct seems to be quite type safe. This could be really simple.

imarios commented 7 years ago

Adding myself as a placeholder. I would love to get help if anyone is interested.

jacobBaumbach commented 7 years ago

I'd like to give this a go, if no one else has already started on it.

imarios commented 7 years ago

Hi @jacobBaumbach ! sorry for the delay! we would love your help. Feel free to ask for any help. Best!