NICTA / scoobi

A Scala productivity framework for Hadoop.
http://nicta.github.com/scoobi/
482 stars 97 forks source link

Add ScoobiEnvironment, including support for counters, etc. #238

Closed tdyas closed 11 years ago

tdyas commented 11 years ago

Merged manually from: https://github.com/benwing/scoobi/commit/56964f9c https://github.com/NICTA/scoobi/pull/123 https://github.com/NICTA/scoobi/pull/145

Conflicts: src/main/scala/com/nicta/scoobi/Scoobi.scala src/main/scala/com/nicta/scoobi/impl/exec/MapReduceJob.scala src/main/scala/com/nicta/scoobi/impl/exec/MscrMapper.scala src/main/scala/com/nicta/scoobi/impl/exec/MscrReducer.scala

etorreborre commented 11 years ago

Hi Tom,

Thanks for this pull request. I have a fundamental problem with it and I don't know yet how to solve that elegantly. The issue is with sharing mutable state in the ScoobiEnvironment object. I think that we should find a way to avoid that. One possibility is to provide special functions in the API where you can get access to the context, something like:

DList(1, 2, 3).map { (env: ScoobiEnvironment, i:Int) => 
  env.incrementCounter(group, name)
  i + 1
}

Would that be ok for you?

tdyas commented 11 years ago

Yeah, something more functional would be better if it isn't too much trouble to make an "environment" or "context" instance available. This pull request is really just a forward port of someone else's pull request (links in the commit message), so I didn't do any redesign of that existing code. Let me give it some thought.

Maybe have methods like mapContext or flatMapContext that would pass in a "context"? (Since we wouldn't want to burden the regular map/flatMap methods with the context.)

etorreborre commented 11 years ago

Here is my proposal. In the latest 0.7.0-SNAPSHOT you can use something which we already have which is the parallelDo method on DLists. I've extended this method and the Emitter trait so that you can access a Counters trait like that:

val list = DList(1, 2, 3).map((i: Int) => i + 1).parallelDo((input: Int, counters: Counters) => {
  counters.incrementCounter("group1", "counter1", 1)
  input + 1
})

The Counters trait offers the incrementCounter and getCounter methods. You can also send a hearbeat with the following method:

val list = DList(1, 2, 3).map((i: Int) => i + 1).parallelDo((input: Int, progress: Heartbeat) => {
  progress.heartbeat
  input + 1
})

And all methods are available on the ScoobiJobContext trait which will allows to possibly add other methods in the future:

val list = DList(1, 2, 3).map((i: Int) => i + 1).parallelDo((input: Int, context: ScoobiJobContext) => {
  context.incrementCounter("group1", "counter1", 1)
  context.heartbeat
  input + 1
})

Note that a Scoobi job typically spans several Hadoop jobs so at the end of a Scoobi application run, you can get the values of the counters by interrogating the ScoobiConfiguration object:

  configuration.counters.getGroup("my group").find("myCounter").getValue

At this stage you can decide to log the values if you want to.

You can try this code in the latest 0.7.0-SNAPSHOT.

How does that sound?

tdyas commented 11 years ago

That would work for me. I was already using this form in code relying on the original pull request:

.map { x => Scoobi.incrCounter("foo", "bar"); x }

which now it seems would just be:

.parallelDo { (x, counters: Counters) => Counters.incrementCounter("foo", "bar"); x }
etorreborre commented 11 years ago

Ok, so I'm closing this pull request for now. Please open an issue if something is not working for you.