nathanmarz / cascalog

Data processing on Hadoop without the hassle.
Other
1.38k stars 179 forks source link

Aggregation group getting spread across multiple reducers #224

Closed dkincaid closed 10 years ago

dkincaid commented 10 years ago

I have a step in a query that's producing the following in a reduce step:

2013-12-17 18:33:57,709 INFO cascading.flow.hadoop.FlowReducer: sourcing from: GroupBy(65a1d4b2-8ee4-4317-8f3c-babaf9ac96df)[by:[{3}:'?agg-granularity', '?time-bucket', '?time-granularity']]

the problem I'm seeing is that some of the groups are getting spread across more than one reducer. For example, here is some logging I put in the CascalogAggregator that is running in this step. From one reducer:

2013-12-17 18:34:02,365 INFO com.idexx.lambda.hadoop.jobs.metrics.MasterDataSetMetrics: Aggregating property value counts for property type animalProperty#name: time DAY, level SITE, bucket 2013/12/17 2013-12-17 18:34:02,366 INFO com.idexx.lambda.hadoop.jobs.metrics.MasterDataSetMetrics: Aggregating property value counts for property type animalProperty#name: time DAY, level SITE, bucket 2013/12/17 2013-12-17 18:34:02,366 INFO com.idexx.lambda.hadoop.jobs.metrics.MasterDataSetMetrics: Completed aggregating property value counts for time DAY, level SITE, bucket 2013/12/17: values [animalProperty#name]

and here is the logging from one of the other reduce tasks:

2013-12-17 18:34:08,305 INFO com.idexx.lambda.hadoop.jobs.metrics.MasterDataSetMetrics: Aggregating property value counts for property type animalProperty#name: time DAY, level SITE, bucket 2013/12/17 2013-12-17 18:34:08,305 INFO com.idexx.lambda.hadoop.jobs.metrics.MasterDataSetMetrics: Aggregating property value counts for property type transactionProperty#transactionNumber: time DAY, level SITE, bucket 2013/12/17 2013-12-17 18:34:08,306 INFO com.idexx.lambda.hadoop.jobs.metrics.MasterDataSetMetrics: Completed aggregating property value counts for time DAY, level SITE, bucket 2013/12/17: values [transactionProperty#transactionNumber, animalProperty#name] 20

as you can see the tuples in the group with values ["DAY", "2013/12/17", "SITE"] are being split across different reduce tasks giving me incorrect results. Both reduce tasks say they are sourcing from the same GroupBy that I pasted above.

The part of the query causing problems looks like this:

// Query to aggregate the properties by customer
Subquery propertyAggregator = new Subquery("?time-granularity", "?agg-granularity", "?time-bucket", "?value-counts")
    .predicate(query, "?property-type", "?time-granularity", "?agg-granularity", "?time-bucket", "?aggregate-hll", "?aggregate-total", "?total-min", "?total-max")
    .predicate(new BuildPropertyValuesCounts(), "?property-type", "?aggregate-hll", "?aggregate-total", "?total-min", "?total-max").out("?value-counts");

?time-granularity is an enum with values DAY, MONTH, ALLTIME. ?agg-granularity is either a customer id (a random string in these tests), or the word SITE. ?time-bucket is a string that is either year/month/day or year/month.

BuildPropertyValueCounts is a CascalogAggregator and contains the logging statements that are shown above.

Right now I'm running it out to a text delimited sink, so I expect the output to look like this:

MONTH UCsnH9hT7n 2013/12 [MdsMetricsPropertyValues]: (specific values here) DAY SITE 2013/12/19 [MdsMetricsPropertyValues]: (specific values here) MONTH udAqFt16AY 2013/12 [MdsMetricsPropertyValues]: (specific values here)

where the first three values are unique across all the part-0000* files (I've truncated the lines for clarity). And this is what I get when I run the job against my development VM running MapR.

When I run this against our single node cluster set for CI I'm getting some duplicated rows across the part-0000* files (the reducer outputs), but never in the same file:

DAY SITE 2013/12/19 [MdsMetricsPropertyValues]: (specific values here) DAY SITE 2013/12/19 [MdsMetricsPropertyValues]: (specific values here) DAY k5wG2tgsCx 2013/12/19 [MdsMetricsPropertyValues]: (specific values here) DAY k5wG2tgsCx 2013/12/19 [MdsMetricsPropertyValues]: (specific values here) DAY AOFHtFqeF8 2013/12/19 [MdsMetricsPropertyValues]: (specific values here)

but not all are duplicated. The expected output consists of 13 lines. The bad output is 21 lines. I haven't been able to find a pattern between which are duplicated and which are not.

dkincaid commented 10 years ago

I figured this one out thanks to this blog post: http://squarecog.wordpress.com/2011/02/20/hadoop-requires-stable-hashcode-implementations/

If you refer back to my post with the query in it you'll see that one of the vars I'm using to group on for the aggregation is a Java enum. I just mentioned it in passing, so it was easy to miss. Java's enum does not have a stable hashCode() implementation. So, the same enum value would hash to different values from JVM to JVM. I changed that var from a enum to a string and it's working perfectly now.

Now I have to ask myself what object types are ok to use as var when doing aggregations? I'll have to dig a little bit and see if I can figure out what Cascalog and Cascading are doing under the covers with these objects.