Add a type safe wrapper around CoGroupByKey that has a more Scala-like feel. It does remove a bit of flexibility, in that there's no special support for extracting just a single value from one of the input collections (getOnly in the Dataflow API) but that is quite easily done in Scala with values.headOption.getOrElse(defaultValue) – functionally, it's the same.
The added benefits are:
1) No coder trouble, as everything is wrapped.
2) Type safety – the output is a key/value pair with the value being an n-tuple containing types that correspond to the input collections' value types.
3) Works with up to 22 input collections.
Example
val stockNames: PCollection[KV[Ticker, String]] = ...
val buyOrders: PCollection[KV[Ticker, BuyOrder]] = ...
val sellOrders: PCollection[KV[Ticker, SellOrder]] = ...
val orders: PCollection[KV[Ticker, (Iterable[String], Iterable[BuyOrder], Iterable[SellOrder])]] =
stockNames.coGroupByKey(buyOrders, sellOrders)
Add a type safe wrapper around
CoGroupByKey
that has a more Scala-like feel. It does remove a bit of flexibility, in that there's no special support for extracting just a single value from one of the input collections (getOnly
in the Dataflow API) but that is quite easily done in Scala withvalues.headOption.getOrElse(defaultValue)
– functionally, it's the same.The added benefits are:
1) No coder trouble, as everything is wrapped. 2) Type safety – the output is a key/value pair with the value being an n-tuple containing types that correspond to the input collections' value types. 3) Works with up to 22 input collections.
Example