zendesk / scala-flow

A lightweight library intended to make developing Google DataFlow jobs in Scala easier.
Apache License 2.0
14 stars 1 forks source link

Allow branching PCollections #16

Open dasch opened 7 years ago

dasch commented 7 years ago

Inspired by a similar method in Kafka Streams, this method allows splitting a PCollection into several collections based on a list of predicates. The first predicate that returns true for an element determines which collection the element is sent to. A runtime error is thrown (:sad-face:) if no predicates match, so it's a good idea to have a catch-all at the end, if necessary.

There's also another variation, branchMap that uses partial functions rather than predicates – the first partial function that is defined for an element will be called with the element, and the returned value will be placed in the respective PCollection.

We could also have a version that accepts n functions of the form A => Option[B] – the first one that evaluates to Some(x) for an element would have its respective collection have x written to it. Something like:

val (xEvents, yEvents, zEvents) = events.branchMap(parseX, parseY, parseZ)

Example

val companies = ...

val (it, pharma, other) = companies.branch(
  _.sector == "it",
  _.sector == "pharma",
  _ => true
)

sealed abstract class Event
case class Purchase(purchaseId: String, product: String, qty: Int) extends Product
case class Refund(purchaseId: String) extends Product

val events = ...

val (purchases, refunds) = events.branchMap(
  { case Purchase(purchaseId, product, qty) => (purchaseId, product, qty) },
  { case Refund(productId) => productId }
)

// Types:
purchases: PCollection[(String, String, Int)]
refunds: PCollection[String]

// We might even do some funky stuff with `unapply` here...

Tasks

bcasey-zd commented 7 years ago

I feel we're already well covered with the existing Partition method. We can compose the various use cases above using the Scala stdlib, for example matching the first in a list of predicates would be:

val predicates: List[A => Boolean]
val collection: PCollection[A] = ...

collection.apply(Partition.of(predicates.size, new PartitionFn<A>() {
  def partitionFor(value A, int numPartitions)  = predicates.indexWhere(_(value))
}))

However we could easily add a nice sugar to CollectionOps in order to prevent needing an inner class.

dasch commented 7 years ago

I definitely feel that tuples are the right approach here.