twitter / summingbird

Streaming MapReduce with Scalding and Storm
https://twitter.com/summingbird
Apache License 2.0
2.14k stars 267 forks source link

leftJoin against KeyProducer should support window on both streams #690

Open pankajroark opened 7 years ago

pankajroark commented 7 years ago

Right now it supports window on only one stream. e.g. if x.leftJoin(y, buffer) then only events of y are kept for a window of time.

This is a problem when processing is needed to be done on only a part of payload data. Let's say X is a big thrift event, which wants an upper limit applied on only one field on it.

class X { val id: Long val charge: Int ... }

Let's say we want to keep track of sum of charges per id and reduce it if needed to keep the maximum charge limited. e.g. if limit is 100 and current sum is at 99 and we get an event with charge of 2 we reduce it to 1. If current sum is already 100 then we reduce charge to 0 for all subsequent events.

The efficient way to do it would be to send only charge and id to a summer to apply the limits and join back with the original stream. But since the current join is windowed on only one stream there will be a race condition and we can fail to join.

We should support joins that are windowed on both streams.

johnynek commented 7 years ago

I'm not super crazy about windows, but mostly because merging offline and online results seems like a challenge. If you have a clear implementation idea, I'm happy to hear it.

One problem with summingbird is that you have to pass the storage for such things (Buffer), because storm has no storage, nor do some other streaming systems.

That said, windows can be done by a special data type and using sumByKey with the correct Semigroup[V]. So, if we have a Window[V] that manages this, I think we can leverage the existing merging of offline and online.

I do think the race conditions in complex topologies is an issue. I think this could be solved with new APIs for sumByKey and write For instance:

// write out, and get a way to read the store (service) and an event stream of changes
def sumByKeyEff(s: Store[K, V])(implicit s: Semigroup[V]): Effect[(Service[K, V], Producer[(K, (Option[V], V)))]

def writeEff[T](s: Sink[T]): Effect[Producer[T]]

Where Effect[T] is a monad like Execution[T] or Future[T]. This will allow us to make sure that effects (writing external to the system) in a way that we can be sure that one write happens before another read.

Planning this offline seems doable pretty easily. Online there can be real challenges since ordering the reads and writes is not trivial (also at Twitter data-rates, it might not always scale easily).