typelevel / frameless

Expressive types for Spark.
Apache License 2.0
877 stars 138 forks source link

Optional aggregation columns shortcut the computation to an empty dataset #239

Open imarios opened 6 years ago

imarios commented 6 years ago

so it seems that if during an aggregation the first aggregate returns null, then it doesn't even care about the next results, it just returns nothing. Going to look if this is a Spark bug and not a Frameless one. Nope, the issue seems to be on our side of the fence

case class X[A,B](a: A, b: B)
val t = TypedDataset.create(X[Option[Int],Long](None, 0)::Nil)
t.show().run()
+----+---+
|   a|  b|
+----+---+
|null|  0|
+----+---+

scala> t.agg(first(t('a)), sum(t('b))).collect().run()
res: Seq[(Option[Int], Long)] = WrappedArray()

scala> t.agg(sum(t('b)), first(t('a))).collect().run()
res: Seq[(Long, Option[Int])] = WrappedArray((0,None))

Test that fails randomly due to this issue (NonAggregateFunctionsTests.scala).

 test("Empty vararg tests") {
    import frameless.functions.aggregate._
    def prop[A : TypedEncoder, B: TypedEncoder](data: Vector[X2[A, B]]) = {
      val ds = TypedDataset.create(data)
      val frameless = ds.select(ds('a), concat(), ds('b), concatWs(":")).collect().run().toVector
      val framelessAggr = ds.agg(first(ds('a)), concat(), concatWs("x"), litAggr(2)).collect().run().toVector
      val scala = data.map(x => (x.a, "", x.b, ""))
      val scalaAggr = if (data.nonEmpty) Vector((data.head.a, "", "", 2)) else Vector.empty
      (frameless ?= scala).&&(framelessAggr ?= scalaAggr)
    }

    check(forAll(prop[Long, Long] _))
    check(forAll(prop[Option[Vector[Boolean]], Long] _))
  }
imarios commented 6 years ago

@OlivierBlanvillain, this was the reason master was failing. The doesn't doesn't always break only when it happens to generate an empty Option.