twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

Strange (or inconsistent) behaviour for GroupBy -> SortBy #1593

Open siliconluffy opened 8 years ago

siliconluffy commented 8 years ago

'Oscar Boykin' via Scalding Development Can you file an issue if nothing else to track the discussion?

On Sat, Sep 10, 2016 at 18:13 Oscar Boykin oscar@stripe.com wrote: Wait, sorry. Looking more carefully. Is the bug that originally all data with id 0 was in one reducer but with sorting it winds up on both? That would be a bug. What version of scalding is this?

Can you replicate this bug in a minimal case? Sorting should not change how the keys are paritioned to reducer (which is done by hashCode of the key, which is the same, I suppose).

Basically the test you want to write is that after groupBy with sortBy if you take only the keys in the output each key appears exactly once.

I have a hard time believing there could have been a bug like this that we didn't notice for 5 years but I guess it is possible. On Sat, Sep 10, 2016 at 17:33 ravi kiran holur vijay ravikiranhv@gmail.com wrote: Hey Oscar,

Sorry, sounds like I might have misunderstood the semantics of groupBy followed by sortBy. Is there a way to make sure ALL records having the same key end up at the same reducer (what groupBy does) and within each reducer, have it sorted by value (what sortBy does)?

-Ravi

On Sat, Sep 10, 2016 at 8:06 PM, Oscar Boykin oscar@stripe.com wrote: Sorry, I don't follow. What did you expect. I don't see a bug.

The data looks sorted within groups, which is all sortBy does.

Note, you don't need forceToReducers here. Sorting can only be done on the reducers. On Sat, Sep 10, 2016 at 15:12 ravi kiran holur vijay ravikiranhv@gmail.com wrote: Hello,

I am noticing strange behaviour in my Scalding job which uses groupby and sortby. If I do not have a .sortBy function, each of the reducers are getting all of the values for the same group key. However, if I use .sortBy, each reducer is getting only part of the values for the same group key. I was wondering if any of you have run into a similar issue before or have a hypothesis about what's happening?

Case 1: Observed behaviour = Expected behaviour, without using sortBy

Reducer 1 output:

Processing data for group ... 1 Initializing FM Model with existing parameters ... Processing model param ... o Processing model param ... w Processing model param ... r Processing model param ... s Processing model param ... t Processing model param ... l Processing model param ... f Processing model param ... v Processing model param ... v Processing model param ... v Processing model param ... v Processing model param ... v Initialized FM Model with: w0=-0.181250, w=34531087, v=5, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1

Reducer 2 output: Processing data for group ... 0 Initializing FM Model with existing parameters ... Processing model param ... o Processing model param ... w Processing model param ... r Processing model param ... s Processing model param ... t Processing model param ... l Processing model param ... f Processing model param ... v Processing model param ... v Processing model param ... v Processing model param ... v Processing model param ... v Initialized FM Model with: w0=-0.181250, w=34531087, v=5, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1

Case 2: Observed behaviour != Expected behaviour, after using sortBy Reducer 1 output Processing data for group ... 0 Initializing FM Model with existing parameters ... Processing model param ... v Processing model param ... v Processing model param ... v Processing model param ... v Processing model param ... v Initialized FM Model with: w0=0.000000, w=0, v=5, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0 Processing data for group ... 1 Initializing FM Model with existing parameters ... Processing model param ... f Processing model param ... l Processing model param ... o Processing model param ... r Processing model param ... s Processing model param ... t Processing model param ... w Initialized FM Model with: w0=-0.181250, w=34531087, v=0, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1

Reducer 2 output Processing data for group ... 0 Initializing FM Model with existing parameters ... Processing model param ... f Processing model param ... l Processing model param ... o Processing model param ... v Processing model param ... r Processing model param ... s Processing model param ... t Processing model param ... w Initialized FM Model with: w0=-0.181250, w=34531087, v=1, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1 Processing data for group ... 1 Initializing FM Model with existing parameters ... Processing model param ... v Processing model param ... v Processing model param ... v Processing model param ... v Initialized FM Model with: w0=0.000000, w=0, v=4, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0

Code val data: TypedPipe[(Int, Float, Either[FeatureVector, FMModelParameter])] = modelData val fmModels: SortedGrouped[Int, FMModel] = data .groupBy { case (id1, id2, modelParam) => id1 } .sortBy { case (id1, id2, modelParam) => id2 } .forceToReducers //Secondary is needed to ensure model parameters appear before actual training data //TODO: This sortby is causing problems and has a bug .mapGroup { case (groupId, records) => println("Processing data for group ... " + groupId) val trainedModel = aggregateAndUpdateModel(records) Iterator(trainedModel) }

johnynek commented 8 years ago

@siliconluffy this does not fail for me:

https://github.com/twitter/scalding/pull/1594/files

You have a very old hadoop. I wonder if that could be the issue? Could you try with the latest scalding (0.16.0 for instance) and see if you see the problem still?

siliconluffy commented 8 years ago

I tried with the latest version, but still running into the same issue. Scalding 0.16.0 Cascading 2.7.1 -Ravi

On Mon, Sep 12, 2016 at 1:50 PM, P. Oscar Boykin notifications@github.com wrote:

@siliconluffy https://github.com/siliconluffy this does not fail for me:

https://github.com/twitter/scalding/pull/1594/files

You have a very old hadoop. I wonder if that could be the issue? Could you try with the latest scalding (0.16.0 for instance) and see if you see the problem still?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/twitter/scalding/issues/1593#issuecomment-246488446, or mute the thread https://github.com/notifications/unsubscribe-auth/AVGWrKGANTE20kNH-EhyLOUpzH5zdBY-ks5qpbsZgaJpZM4J6FL4 .

siliconluffy commented 8 years ago

I think I found a way to isolate this bug to the line that's causing this. My hypothesis is there might be a limit to the size (in bytes) each record within reducer can have.

val modelData: TypedPipe[(Int, (Float, Array[Float]))] = fmModel
    .flatMap {
      curVal =>
        (0 until 3).map {
          curIndex =>
            println("Replicating model parameter " + curVal.paramType + " for group " + curIndex)
            **//[ROOT CAUSE] This causes values with same key to be distributed across different reducers** val recordValue: Array[Float] = curVal.paramValue
            //**[If we replace a large float array with one that has only 2 elements, values with same key ends up in a single reducer]** val recordValue: Array[Float] = Array.fill(2)(0.0f)
            (curIndex, (-1.0f, recordValue))
        }
    }

  val fmModels = modelData
    .group
    .sortBy(_._1)
    .withReducers(3)
    .mapGroup {
      case (groupNum, modelParams) =>
        println("Processing data for group " + groupNum)
        modelParams.foreach {
          case (na, modelParam) =>
            println("Processing parameter " + modelParam.length)
        }
        Iterator("test1")
    }

  fmModels.values.write(TypedTsv("s3://pinlogs/ads/rvijay/usermodeling/delete/test"))

Map-Reduce Counters with huge arrays as values

screen shot 2016-09-12 at 9 29 15 pm

Map-Reduce Counters with smaller arrays as values

screen shot 2016-09-12 at 9 29 26 pm
johnynek commented 8 years ago

This is really strange. I have never seen anything like it.

Any ideas @cwensel ?

How big are there arrays? What do you mean by Huge?

johnynek commented 8 years ago

And to be clear, if either:

  1. the arrays are small
  2. you don't do the sort

then you don't observe the problem?

siliconluffy commented 8 years ago

Yes, that's a great summary. huge: O(35M)

Thanks! Ravi

cwensel commented 8 years ago

Hadoop 'could' have a limit on the key size, but I don't know of it. But it would make sense that it would considering all the things that are done to keys in the effort of making MapReduce work.

When doing a secondary sort (I'm assuming that's whats going on) we have to promote the sort fields to they key to inherit the sorting, and with a trick or two, we can exclude those values from the partitioning - so we only group on the keys we want to group from.

The fact that additional groups are showing up makes me think something odd is happening in Scalding as secondary sorting in Cascading is many years stable. but evidence otherwise would be good to come by quickly.

Any chance this can be tested on the Tez platform?

siliconluffy commented 8 years ago

Sorry, I am not familiar with Tez. For the original application, I was able to get around this strange behaviour by using distributed cache to directly read the large sequence file, instead of shuffling it's records through the partition/sort phase. Not sure what the next steps are though ...

-Ravi

johnynek commented 8 years ago

@cwensel scalding is just passing through a comparator here: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/FieldConversions.scala#L268

And this has been stable for many years and powered untold thousands (millions?) of jobs at Twitter.

@siliconluffy I guess @cwensel will only look at it if we make a cascading only version of the bug.

Since we can only see it when the values are large enough, to me that suggests something at the Hadoop (or possibly Cascading) layer. We don't have any logic that switches on size of the values.

Chris, I took a moment to write a scalding test case: https://github.com/twitter/scalding/pull/1594/files

which passes. Can you show him how to write a cascading test case that mirrors some simple functionality?

cwensel commented 8 years ago

So HashJoin, which I assume is what you used when you decided to bypass the shuffle phase, does not support secondary-sorting, which is what I presumed was the issue per the title (GroupBy -> SortBy). if not, i've no idea what is meant by SortBy (Cascading doesn't ship with a SortBy, which I would assume would be like a MinBy or MaxBy AggregateBy sub-class). If there was a SortBy, it would inherently perform GroupBy, adding to my confusion as to why GroupBy would proceed SortBy.

And if HashJoin is in play and working, again, there was no reliance on the secondary-sorting feature of GroupBy. leaving me confused.

If this is NOT a secondary sort, but just a GroupBy (that may or may not provide a Comparator on the key), I suspect the 'array' returns an inconsistent hash value after reaching a certain size.

It would be quite useful to see the query plan of the app (call Flow#writeDOT()).

johnynek commented 8 years ago

These is no HashJoin here. Just a GroupBy with a secondary sort. sortBy is a method in scalding to set up secondary sorting on a GroupBy. So, .groupBy(_.foo).sortBy(_.bar) is build a cascading GroupBy which has a secondary sort set.

I don't see why we need a hashcode on the array here, because the array is in his value. His GroupBy key is an Integer. His secondary sort is a Float, but somehow increasing the size of the array in the value part of the tuple is coming into play.

cwensel commented 8 years ago

Like I said, i'm very confused. I have no idea why adding the dist-cache makes it work. or is that a red-herring.

So yes, no hash on the array, even if its promoted to be apart of the key in the MR job.

johnynek commented 8 years ago

yes. It is baffling.

On Thu, Sep 15, 2016 at 10:31 AM Chris K Wensel notifications@github.com wrote:

Like I said, i'm very confused. I have no idea why adding the dist-cache makes it work. or is that a red-herring.

So yes, no hash on the array, even if its promoted to be apart of the key in the MR job.

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/twitter/scalding/issues/1593#issuecomment-247446107, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEJdtrrQ1Q_h3Wg4wXFSR7Z6hUUjIE0ks5qqasmgaJpZM4J6FL4 .