twitter / summingbird

Streaming MapReduce with Scalding and Storm
https://twitter.com/summingbird
Apache License 2.0
2.14k stars 267 forks source link

Tiered Aggregation for Scalding #140

Open sritchie opened 11 years ago

sritchie commented 11 years ago

(From Joe Nievelt)

Hey Summingbird folks,

I'm coming back to the problem of data growth and sketch map aggregation accuracy, and I have some ideas on how I'd like to change the scalding job that ouroboros runs. I'm not sure, however, how easily this could be done in Summingbird (or any underlying frameworks). Any suggestions, broad or specific, are very welcome.

Conceptually, I imagine that all my data is being processed together. I'll come back to the matter of incremental aggregation in a minute.

Suppose I have my event source E (with events mapped 1:1 to monoids) with time data like YYYY.MM.DD.hh.mm.ss The data could be generated like:

H = E.groupBy(YYYY.MM.DD.hh) // ...
D = H.groupBy(YYYY.MM.DD) // ...
M = D.groupBy(YYYY.MM) // ...
Y = M.groupBy(YYYY) // ...
all = Y.sum // ...

The important feature, of course, is that the groupBy are not all directly on E. This adds data dependencies but would overall reduce the number of tuples generated (on the order of 80% for our application).

Incremental aggregation complicates things, in that these computations may produce keys that overlap with those in the existing data set (H', D', M', Y', all'). One approach would be to generate H, merge it with H', and then recompute the other data sets from scratch. But this puts a pretty high coefficient on the O(cumulative data) term in the runtime, and also requires us to keep all historical data for H forever.

What I'd like to do is:

Compute H from the incremental data, and merge it into H' (via Monoid.plus)
For all YYYY.MM.DD in H, compute D from the merged H' and overwrite applicable entries in D'
For all YYYY.MM in D, compute M from the merged D' and overwrite applicable entries in M'
For all YYYY in M, compute Y from the merged M' and overwrite applicable entries in Y'
recompute all from merged Y'

For a fixed batch size we'd have a fixed number of merges for steps 2-4 (practically, one or two each). Step 1 is unavoidably bounded by our incremental data size, and step 5 grows very slowly.

I'm just getting started but my main points of investigation are:

Write this phased job in summingbird
Have this in the same job as our storm topology (which I don't really care to change)
Work the O(cumulative data) solution for incremental aggregation
Prune the computations to get closer to O(incremental data)

I'd love to hear what you think, or any pointers to getting started.

Thanks,

Joe

jnievelt commented 11 years ago

Some additional thinking I've done on this topic:

It makes sense to me to represent this as a producer. That seems to be the point at which platforms diverge, and this is one place where it seems natural to have them diverge. What's not obvious to me yet is how it will all be represented.

For example, this could theoretically be supported only for types that support methods like hourGranularity, dayGranularity, etc. Then the StormPlatform would simply flatMap each element to each of these transforms (along with the identity).

For ScaldingPlatform, we might also need things like isHourGranularity, isDayGranularity, etc. This way, the first step would be to filter out everything except isHourGranularity. Then everything that passes isHourGranularity is flatMapped to List(x, x.dayGranularity). After another reduce, everything that passes isDayGranularity is flatMapped to (x, x.monthGranularity), etc.

This would at least get to the O(cumulative) solution. To prune the data/computation, I think this data needs to be versioned. The flatMapping would then happen as List(x, x.dayGranularity.withVersion(latest)). The monoid would be modified to discard older versions and only do actual merging for items with the same version.

The trick would be to make sure that, when something's being updated (e.g., regenerating data for a month granularity), we flatMap over all the necessary items (e.g., all the days from that month). This could go hand in hand with the data retention policy, however. For example, regeneration could be like "flatMap all hourly data from the last M days" and "flatMap all daily data from the last N months" (ensuring that N months is always >= N days). Data older than these thresholds could also be pruned (more generally, for some M' >= M, N' >= N). Then we would just need to make sure that everyone has the same view of time (derived from the FactoryInput?).

Stepping back a bit, it's not clear what the proper level of abstraction for this is, in terms of describing the producer. Do we want to allow arbitrary time grouping at an arbitrary number of levels? Are there other abstractions that could be made?

jnievelt commented 11 years ago

Thought of the day: perhaps this is better done in a ScaldingStore? Just as the versioned stores transform with pack/unpack, perhaps a new type of versioned store can also pack/unpack an epoch value (EpochVersionedStore)? At that point, it might not be very troublesome to add the extra stages there.

johnynek commented 11 years ago

This is related to an old idea of QueryStrategies (in Storehaus: https://github.com/twitter/storehaus/blob/develop/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/query/QueryStrategy.scala ) and frozen keys: the idea that you you have a function from (K, Interval[Date]) => Boolean to say if a given key can be ignored for a given interval of time (like the key includes time, and it cannot appear in that time range.

I think with these two concepts you can build what you want: query strategies that allow exponentially increasing time ranges to query on without having to load and update the whole key set at every interval of time.