twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

3-stage sketch join #1305

Open avibryant opened 9 years ago

avibryant commented 9 years ago

Right now when we do a sketch join we do two steps: first we compute a CMS of the keys (step 1), then we use that to guide a replicated reduce-side join (step 2).

In cases of extreme skew, it is probably more efficient to do three steps:

  1. first you compute a CMS of the keys, keeping the RHS values for the heavy hitters
  2. then you use that to do a map-side only join, just of the heavy hitters, mapping over the LHS
  3. then you pass over the LHS again, filtering out those same heavy hitters, and do a standard reduce-side join with the RHS

The result is the union of the outputs from steps 2 and 3.

johnynek commented 9 years ago

This is an interesting idea.

The heavy-hitter set, might be quite large, but the win here is that you are able to do a broadcast without shuffling the left-hand side? Given that we are going to do the shuffle after, are you sure this creates a win over the current approach?

By the way, sketchJoin works well for people at Twitter. Are you using it? I'd be a bit more interested in tricks to cache the CMS between similar jobs (like an hourly or daily job). That seems like the bigger win.

avibryant commented 9 years ago

Hmm, I glossed over something here and I think it's actually 4 stages.

  1. Build the CMS from the keys on the LHS. Make that available as a ValuePipe.
  2. flatMapWithValue with that CMS on the RHS, to filter to the (small number) of RHS values whose keys, when looked up in the CMS, count as heavy hitters. Now make that available as a ValuePipe.
  3. flatMapWithValue with those RHS heavy hitter values over the LHS, performing joins where you come across the heavy hitter keys. One way or another, you have a partitioned output here: LHS values that match the heavy hitter keys, and so can be joined immediately (call this the "pre-join"), and those that don't, where you only output the LHS (call these the "remainder").
  4. Join just the remainder from the LHS with the full RHS.

Now (logically, not physically) union the pre-join with the output from 4 and you have your full output.

The point is that you only need to shuffle the remainder. Given a log-normal or similar distribution, this may actually be significantly smaller than the full LHS.

(Caching CMS between jobs is cute, but raises all kinds of cache invalidation issues - though I guess it's easy to arrange for worst case to always just be poor performance. Anyway you should start a separate issue for that).

johnynek commented 9 years ago

cute. Ouch.

avibryant commented 9 years ago

C'mon Oscar, you should know that cute is high praise from me.

reconditesea commented 9 years ago

What if the heavy-hitter keys are also skewed in RHS? Then that ValuePipe can be quite large and we will copy it to each mapper?

jnievelt commented 9 years ago

Yep, if RHS isn't small, then we could OOM. Is mutually-skewed join worth optimizing? It seems like there might be something to gain from adding a third component which does a skewJoin (now that it works 😉) if we want to address it.

avibryant commented 9 years ago

@reconditesea I wasn't worrying too much about the case where the RHS is skewed - at least at Stripe, the vast majority of cases we see skew are N:1 relationships not N:M.

johnynek commented 9 years ago

single skew is very common when you are looking up features or attributes on a skewed event log. Jointly skew is common when you are doing matrix products or certain graph operations.

Joe: sketchJoin is almost certainly superior (probably in every way, except the support for jointly skewed joins) to skewJoin: in sketchJoin the approximate counter is able to filter some keys on the mapper side (which in some cases can be useful) and generally the CMS method is a more efficient approximate count than sampling.

reconditesea commented 9 years ago

@avibryant I think N:M skewed join or sometime even self-join is a common case in graph operations as @johnynek mentioned above. But I guess that would be a blocking issue to this idea. In current implementation of sketchedJoin, RHS keyed data is also copied to each of the replicate reducers. In this proposed approach, it's copied to each mapper. So I think the pros and cons is based on the relative size of the two sides. If LHS is significantly larger than RHS, then saving shuffle cost to only copy reminder data will certainly be more beneficial. Otherwise, we should only copy RHS to each replicate reducers, instead of each mapper.

avibryant commented 9 years ago

@reconditesea well, it's not that the entire RHS gets replicated to the mappers (that's just a normal hashJoin). It's that the heavy hitters do.

So to be a bit more formal, the case that I believe this is valuable for, whether you have N:1 or N:M, is when:

That's because RF is going to be replicated here, but only RG+LG are going to be shuffled (vs. a normal join which would have to also shuffle RF+LF); so you want RF+LF to be large, ideally larger than RG+LG, to make the savings worth it. And since RF is required to be small, really what that means is that LF has to be very large - and so you want F to be, approximately, the most frequently appearing keys in the LHS (but not so many that RF gets too big).

That sounds really restrictive but actually I think happens all the time.

reconditesea commented 9 years ago

@avibryant Yeah, I totally got that. I'm thinking the extrema case where |RF| is also very large if RHS is skewed the similar way. For example, a self-join. But I agree that the current implementation of sketch-join won't work well either for this situation. Because |RF| will also be copied to each of the replicate reducers for the heavy hitter key.

It's interesting to see how this new 3-step approach work compared to existing implementation on some benchmark datasets.