twitter / summingbird

Streaming MapReduce with Scalding and Storm
https://twitter.com/summingbird
Apache License 2.0
2.14k stars 267 forks source link

Add a WithTime node #707

Open johnynek opened 7 years ago

johnynek commented 7 years ago

Summingbird internally keeps a timestamp for all values, but we don't expose that to the user. It is a pain to always thread it through. We could add it back by adding a new node:

case class WithTime[P, T](p: Producer[P, T]) extends Producer[P, (T, Timestamp)]
case class ValueWithTime[P, K, V](p: Producer[P, (K, V)]) extends Producer[P, (K, (V, Timestamp))]

then at plan time, we can just treat this like a map that adds the timestamp, which we know at the time.

This would clean up some internal APIs we have if summingbird supported it, and it would also close #688 since can always recover the timestamp at any point.

johnynek commented 7 years ago

@pankajroark what do you think of this? I can work on adding it if we can ever get our tests to not OOM.

oscar-stripe commented 7 years ago

ping on this @ttim ?

We need the time in user land a lot at Stripe. I can possibly find time to work on this unless you see any blockers.

ttim commented 7 years ago

@johnynek it introduces (conceptually) notion of time into core platform.

Pros: it's already a case and makes everything more consistent. Cons:

In general I like the idea to put time into core and build everything else around.

pankajroark commented 7 years ago

Will this mean that users will be able to specify a summingbird job without time? That may not be a bad idea because that would support online only use cases more efficiently, right now users fix the timestamp for that.

Or are these nodes solely aimed at being able to extract time which is hidden. The api seems a bit magical in that case. It will be great if you could give an example.

oscar-stripe commented 7 years ago

@pankajroark I don't think it helps you run without time as I am conceiving this, almost the opposite: you have to be able to give a time for each event.

So, what I want is this: we have a system similar to tsar which aggregates keys into many buckets. To do this, we need to know the time. We currently carry a copy of the time around in the value. That is a waste since internally summingbird knows the time. The .withTime method would make a copy of the internal time of the event out so we could bucked without carrying that copy everywhere (which is especially painful across store/sumByKey boundaries).

pankajroark commented 7 years ago

Even though the extraction of time out of nothing seems a bit magical to me, I realize the practical utility. I'm onboard.