Netflix / PigPen

Map-Reduce for Clojure
Apache License 2.0
565 stars 55 forks source link

Strange behavior of count distinct #108

Closed theJohnnyBrown closed 9 years ago

theJohnnyBrown commented 9 years ago

Hi Folks, we're getting some unexpected results here

(def records [:a :a :b :b :c])

(->> (pig/return records)                                                                                                                                                                                                                                                                                                            
 (pig/fold (fold/count (fold/distinct)))
 (pig/dump))

This code returns [5]. Removing the count gives [#{:a :b :c}], no surprises. I'd expect the count to return the cardinality of the set returned by distinct, but unfortunately something else is happening.

Any ideas?

mbossenbroek commented 9 years ago

That has to do with how folds are composed, which is admittedly a little wonky. A little background...

There are 4 phases to a fold operation: pre-processing, reducing, combining, and post-processing. Pre-processing and post-processing compose in a fairly regular manner, but the reducer/combiner parts go in pairs and don't compose nicely.

Let's look at the combinef/reducef for both of count and distinct.

fn seed reducef combinef
count 0 count (reduce +)
distinct #{} conj set/union

For your example, we would compute the distinct elements for each mapper, and add them to a set. In the combiner, we would then merge those intermediate sets. We can't count the items in the set in the mappers because then we lose what items are in the set and can't properly combine the outputs of the mappers.

I still wanted to have fold operations sort of compose, so that you could do stuff like this:

(->> (fold/map f) (fold/filter g) (fold/distinct) (fold/first))

Which ends up doing something like this:

map filter distinct first result
pre f g (comp g f)
reduce conj conj
combine union union
post first first

But it's not clear which parts apply to which phases. I thought that I would throw an exception if you tried to do what you did, but clearly I do not. What you tried looks something like this because it's using the distributed count:

distinct count result
pre
reduce conj count count
combine union + +
post

Where I'm just taking the last reducer/combiner combo that you supply. What you really want is something like this:

distinct count result
pre
reduce conj conj
combine union union
post count count

Which does the count as a post-processing operation. So now there are two versions of fold/count and that's confusing too...

So, what can you do? The solution today is to define a custom fold-fn that specifies which part is which:

(->> (pig/return records)                                                                                                                                                                                                                                                                                                            
 (pig/fold (fold/fold-fn clojure.set/union conj count))
 (pig/dump))

... or if you want to reuse it ...

(defn distinct-count
  []
  (fold/fold-fn clojure.set/union conj count))

(->> (pig/return records)                                                                                                                                                                                                                                                                                                            
 (pig/fold (distinct-count))
 (pig/dump))

It's not perfect and it could be better. If you have any ideas for making these compose better, I'm all ears. Take a look at the comp-* fns in pigpen.fold if you're curious. If not, pigpen.fold/fold-fn is probably your friend :)

theJohnnyBrown commented 9 years ago

Thanks, this is very helpful. I was wondering whether fold-fn would be necessary. Looks like it's time for a little brain-stretching :)