typelevel / frameless

Expressive types for Spark.
Apache License 2.0
876 stars 138 forks source link

Add support for chained joins #385

Open asm0dey opened 5 years ago

asm0dey commented 5 years ago

Currently there is no way to call several joins in chain. (ON condition fails to work with types) It makes code much less readable because of hard-to-read variables names.

OlivierBlanvillain commented 5 years ago

I suppose the issue here is also due to the results of join being wrapped into an Option, or is there something else to investigate?

asm0dey commented 5 years ago

I'm not sure, but I don't think it's related to Option because it works with intermediate vals.

niebloomj commented 4 years ago

Its because you don't have an intermediate ds to reference in the second join. I would also like to see a fix for this.

achkatabeca commented 4 years ago

It is indeed a big problem, which prevent me from using the library as my use cases involve large number of complex joins.

cchantep commented 3 years ago

Is there a reproducer ?

asm0dey commented 3 years ago

@cchantep I'm not using frameless for couple of years already, but I clearly remember that I wanted to join 4 dataframes in one chain as I can do with datasets, like

a
  .joinLeft(b, x)
  .innerJoin(c, y)
  .joinRight(d, z)
chris-twiner commented 1 year ago

It wasn't immediately obvious what the issue was from the comment chain but per niebloomj's comment above you need the equivalent of joinedDs1 below to get the types correct for the lhs.

Below I've got a named function joinRightChained which would allow chaining, but I also added a join().right approach which allows the same name for both right( condition) and right( intermediateDataset => condition). In either case you'd need duplicate forwarding functions but I definitely prefer the Joiner approach.


  case class Joiner[T, U](ds: TypedDataset[T], other: TypedDataset[U]) {
    def right(conditionF: TypedDataset[T] => TypedColumn[T with U, Boolean])(implicit e: TypedEncoder[(Option[T], U)]): TypedDataset[(Option[T], U)] =
      ds.joinRight(other)(conditionF(ds))

    def right(condition: TypedColumn[T with U, Boolean])(implicit e: TypedEncoder[(Option[T], U)]): TypedDataset[(Option[T], U)] =
      ds.joinRight(other)(condition)
  }

  implicit class Ops[T](ds: TypedDataset[T]) {

    def join[U](other: TypedDataset[U]): Joiner[T, U] = new Joiner[T,U](ds, other)

    def joinRightChained[U](other: TypedDataset[U])(conditionF: TypedDataset[T] => TypedColumn[T with U, Boolean])(implicit e: TypedEncoder[(Option[T], U)]): TypedDataset[(Option[T], U)] =
      ds.joinRight(other)(conditionF(ds))
  }

  test("chained") {
    def prop[
      A: TypedEncoder : Ordering,
      B: TypedEncoder : Ordering,
      C: TypedEncoder : Ordering,
      D: TypedEncoder : Ordering
    ](left: Seq[X2[A, B]], mid: Seq[X2[A, C]], right: Seq[X2[C, D]] ): Unit = {
      val leftDs = TypedDataset.create(left)
      val midDs = TypedDataset.create(mid)
      val rightDs = TypedDataset.create(right)
      /* orig
      val joinedDs1 = leftDs
        .joinRight(midDs)(leftDs.col('a) === midDs.col('a))
      val joinedDs = joinedDs1
        .joinRight(rightDs)(joinedDs1.col('_2).field('b) === rightDs.col('a))
      */

      /* joinRightChained
        val joinedDs = leftDs
        .joinRight(midDs)(leftDs.col('a) === midDs.col('a))
        .joinRightChained(rightDs)(_.col('_2).field('b) === rightDs.col('a))
      */
      // join right
      val joinedDs = leftDs
        .join(midDs).right(leftDs.col('a) === midDs.col('a))
        .join(rightDs).right(_.col('_2).field('b) === rightDs.col('a))

      //joinedDs.show().run()

      val joinedData = joinedDs.collect().run().toVector.sorted
      assert(joinedData == Seq(
        (Some((Some(X2(1,1L)), X2(1,5L))), X2(5L, "5s")),
        (Some((Some(X2(2,2L)), X2(2,6L))), X2(6L, "6s")),
        (Some((Some(X2(3,3L)), X2(3,7L))), X2(7L, "7s"))
      ))

      ()
    }

    prop[Int, Long, Long, String](Seq(X2(1,1L), X2(2,2L), X2(3,3L)), Seq(X2(1,5L), X2(2,6L), X2(3,7L)),
      Seq(X2(5L, "5s"), X2(6L, "6s"), X2(7L, "7s")))
  }
chris-twiner commented 1 year ago

fyi - given this is purely syntactical a thrush combinator should be sufficient:

follow mouse instructions to add it as a dependency then importing mouse.all._ should allow thrush to work:

val leftDs = ...
val midDs = ...
val rightDs = ...

import mouse.all._

val joinedDs = leftDs.joinLeft(midDs)(...)
    .thrush( leftWithMid => leftWithMid.joinLeft(rightDs)(leftWithMid(`attrib == ...))
    .thrush( other joins )

unless there are other compelling reasons for doing something specific for chaining I'd recommend closing this issue.

@cchantep per the PR

asm0dey commented 1 year ago

@chris-twiner so is the thing I wanna do in https://github.com/typelevel/frameless/issues/385#issuecomment-914414300 already doable? Multiple joins, then other pipeline operations like mapping and reducing?

I mean… sometimes I need to have 7 joins, right?

chris-twiner commented 1 year ago

@chris-twiner so is the thing I wanna do in #385 (comment) already doable? Multiple joins, then other pipeline operations like mapping and reducing?

I mean… sometimes I need to have 7 joins, right?

@asm0dey per the above comment you should be able to use a thrush to chain any call (irrespective of frameless or other library) as long as the result is what you need for the next call. Have you tried it and found it didn't work for your case ?

asm0dey commented 1 year ago

Nop, I don't even know what this function is :( For me, it's just a name of a mouth infection.

chris-twiner commented 1 year ago

I've updated the above comment to be more helpful and correct. Please chat back here if it works, or perhaps even more interestingly if it doesn't ^_^

cchantep commented 1 year ago

@chris-twiner maybe a doc PR with added example

chris-twiner commented 1 year ago

@chris-twiner maybe a doc PR with added example

@cchantep on its way