zendesk / scala-flow

A lightweight library intended to make developing Google DataFlow jobs in Scala easier.
Apache License 2.0
14 stars 1 forks source link

Use implicit coders instead of CoderRegistry #10

Closed bcasey-zd closed 7 years ago

bcasey-zd commented 7 years ago

Instead of passing an implicit TypeTag, then creating a TypeDescriptor, then looking this up in a CoderRegistry, then generating a Coder, just have implicit Coder.

A lot of code goes away and it makes it easier for us to support generic classes. The drawback is that users of our library always have to use our methods (to ensure the coder is passed correctly) but having combed through the codebase it looks like we can make it work.

Details

Risks

@zendesk/zodiac

dasch commented 7 years ago

@bcasey-zd LGTM :shipit:

bcasey-zd commented 7 years ago

Ok, I'm stuck on the following use case:

  val options = PipelineOptionsFactory.as(classOf[InProcessPipelineOptions])
  options.setRunner(classOf[InProcessPipelineRunner])

  val begin = Pipeline.create(options).begin
  val pc1 = begin.apply(Create.of(KV.of(1, 1.0), KV.of(2, 2.0)))
  val pc2 = begin.apply(Create.of(KV.of(1, 3L)))

  val tag1 = new TupleTag[Double]()
  val tag2 = new TupleTag[Long]()

  // Merge collection values into a CoGbkResult collection.
  val coGbkResultCollection =
    KeyedPCollectionTuple.of(tag1, pc1)
      .and(tag2, pc2)
      .apply(CoGroupByKey.create[Int]())

  implicit val magic = CoGbkResult.CoGbkResultCoder

  val finalResultCollection =
    coGbkResultCollection.parDo {
      (c: DoFn[KV[Int, CoGbkResult], KV[Int, List[String]]]#ProcessContext) => {
        val e = c.element
        // Get all collection 1 values
        val pt1Vals = e.getValue().getAll(tag1)
        // Now get collection 2 values
        val pt2Val = e.getValue().getOnly(tag2, -1L)

        c.output(List("a", "b"))
      }
    }
    .foreach(println)
    .run()

It's failing to find a coder for CoGbkResult but I'm not sure how to provide one.

dasch commented 7 years ago

Seems like CoGbkResult uses a weird kind of schema system to wrap the tuples, see https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/join/CoGbkResultCoderTest.java

It looks like the schema can be determined based on the TupleTagList used in the CoGroupByKey.

dasch commented 7 years ago

Alternatively – can we detect if a coder has already been manually assigned to a pcollection? I believe CoGroupByKey sets the coder itself.

dasch commented 7 years ago

https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGroupByKey.java#L133

bcasey-zd commented 7 years ago

Alternatively – can we detect if a coder has already been manually assigned to a pcollection? I believe CoGroupByKey sets the coder itself.

That might work - we'll still need a 'fake' CoGbkResult coder in implicit scope, so it doesn't feel like the most robust of solutions.

https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGroupByKey.java#L133

Yep, that's another approach - we create the coder re-using the same code. The user would have to be a little careful with the implicit scope - as one CoGroupByKey coder is different from another, you couldn't have one globally

dasch commented 7 years ago

@bcasey-zd this hack is working: https://github.com/zendesk/scala-flow/pull/10/files#diff-ab760eb0462134574d8aed2e6a2077ed

I haven't been able to generalize it yet.

dasch commented 7 years ago

After much digging around, I think the best approach is to shield users from having to work with KeyedPCollectionTuple and CoGbkResult at all, as these are not statically typed and therefore break our ability to statically guarantee that a coder is available.

I've pushed an example of that in the form of coGroupBykey, a function that returns an actual Scala tuple. We'd need to generate versions for all tuple sizes, of course.

I think this is the best course of action. It allows us to maintain the guarantees we want to make.

dasch commented 7 years ago

Similarly, when splitting a stream into multiple output streams, we should use real Scala tuples. I'm not sure exactly how best to let users decide on the routing, though. Maybe something like this?

val peopleByAgeGroup = people.partition(10, person => person.age / 10)

// Ages 0-9
peopleByAgeGroup._1.writeTo(...)

// Ages 10-19
peopleByAgeGroup._2.writeTo(...)

What concrete use cases do we have for splitting collections into more than two?

bcasey-zd commented 7 years ago

Shielding the user is not the direction I want to take at the moment. Unless we wrap everything, then it leaves traps that the unwary can fall into. Plus it defeats the purpose of this library which is to be close to the Dataflow/Beam API.

Let's back up for a second and review the problem we're trying to solve: Easily creating coders for parameterised types e.g. Foo[T].

The implicits coders solve this nicely but come with some baggage (as above) I've been giving this some thought and think that a macro based solution for parameterised classes should be feasible - this would give us the best of both worlds, easy coders but also full compatibility with the Dataflow API.

dasch commented 7 years ago

I'm not so sure – the more I work on this, the more it makes sense to make the API natural to Scala. That includes using real tuples rather than Dataflow's weird TupleTag system. I think the API produced by this direction is superior to the alternative.

dasch commented 7 years ago

For instance: the coGroupByKey function is vastly simpler (and I think better) than the original pure Dataflow version.

dasch commented 7 years ago

Also: users are free to mess with the "raw" Dataflow API, they just need to end up with a PCollection with an element type that has an implicit coder before we can provide the nice DSL functions.

bcasey-zd commented 7 years ago

Adding alternatives that work better is fine as long as we don't break the existing stuff.

However any loopholes and exceptions makes for a bad user experience e.g don't use CoGrpKey with Scala types, don't use apply unless the output type is specified...

Again, let's look at the problem we actually trying to solve - convenient coder for paramaterised type and see if there's a better way to do it.

dasch commented 7 years ago

I think it's fine for now to either have an explicit point of entry (i.e. wrapping PCollection explicitly) or simply state that the DSL methods are only available for types that have a Coder. We can work to improve the experience once we get better at this. In the meantime, implicit coders makes it a lot easier for everybody to write their jobs.

dasch commented 7 years ago

@bcasey-zd I think it's a good idea to try an alternative approach as well, but until then we're sort of blocked :-(

bcasey-zd commented 7 years ago

Ok, let's roll the dice - I've had enough Coder shenanigans to last a lifetime!

dasch commented 7 years ago

I know the feeling...

dasch commented 7 years ago

Want me to clean up my commits and force push? I think we should keep 'coGroupByKey' because it's awesome. I'll fix up the test.

dasch commented 7 years ago

🎉