NICTA / scoobi

A Scala productivity framework for Hadoop.
http://nicta.github.com/scoobi/
482 stars 97 forks source link

implicit SeqSchema for anything with a WireFormat #236

Closed ellchow closed 11 years ago

ellchow commented 11 years ago

I added an implicit SeqSchema for anything that has a WireFormat. It was a bit of a hassle to persist anything as a sequence file that wasn't one of the standard types. Also, I wanted to use it for checkpointing and writing the avro schema for each checkpoint is a bit prohibitive - of course, this may change when/if the plugin is updated for scala 2.10.

Is there a better way to support this?

ellchow commented 11 years ago

I'm not sure what changed, but this seems to have stopped working since the API for sequence IO was modified. For this code import com.nicta.scoobi.Scoobi._

object HelloWorld extends ScoobiApp{
  def run() = {
    val x = DList((1L, 2L), (2L, 3L),(3L, 4L)).toSequenceFile("checkpoint").checkpoint
    val y = x.map(_._2)
    persist(y.toTextFile("second"))
  }
}

[WARN] LocalJobRunner - job_local_0001 java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(Unknown Source) at scala.Tuple2._2$mcJ$sp(Tuple2.scala:19) at HelloWorld$$anonfun$2.apply(HelloWorld.scala:37) at HelloWorld$$anonfun$2.apply(HelloWorld.scala:37) at com.nicta.scoobi.core.DList$$anonfun$map$1.apply(DList.scala:105) at com.nicta.scoobi.core.DList$$anonfun$map$1.apply(DList.scala:105) at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:79) at com.nicta.scoobi.core.DoFn$class.process(EnvDoFn.scala:55) at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:77) at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:77) at com.nicta.scoobi.core.EnvDoFn$class.processFunction(EnvDoFn.scala:37) at com.nicta.scoobi.core.BasicDoFn$$anon$1.processFunction(EnvDoFn.scala:77) at com.nicta.scoobi.impl.plan.comp.ParallelDo.map(ProcessNode.scala:94) at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter$$anonfun$map$1.apply(VectorEmitterWriter.scala:36) at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter$$anonfun$map$1.apply(VectorEmitterWriter.scala:36) at scala.collection.immutable.List.foreach(List.scala:318) at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter.map(VectorEmitterWriter.scala:36) at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.computeMapper$1(InputChannel.scala:194) at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$$anonfun$computeNext$1$1.apply(InputChannel.scala:182) at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$$anonfun$computeNext$1$1.apply(InputChannel.scala:181) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.computeNext$1(InputChannel.scala:181) at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.map(InputChannel.scala:178) at com.nicta.scoobi.impl.plan.mscr.FloatingInputChannel.map(InputChannel.scala:250) at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62) at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62) at scala.collection.immutable.List.foreach(List.scala:318) at com.nicta.scoobi.impl.mapreducer.MscrMapper.map(MscrMapper.scala:62) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

Also, this fails

import com.nicta.scoobi.Scoobi._

import java.io._
import collection.mutable
import com.nicta.scoobi.core.WireFormat
import org.apache.hadoop.io.BytesWritable

object HelloWorld extends ScoobiApp{

implicit def anyWFSeqSchema[A : WireFormat]: SeqSchema[A] = new SeqSchema[A] {
    type SeqType = BytesWritable

    val b = mutable.ArrayBuffer[Byte]().mapResult(_.toArray)

    def toWritable(a: A) = {
      val bs = new ByteArrayOutputStream
      implicitly[WireFormat[A]].toWire(a, new DataOutputStream(bs))
      new BytesWritable(bs.toByteArray)
    }
    def fromWritable(xs: BytesWritable): A = {
      b.clear()
      xs.getBytes.take(xs.getLength).foreach { x => b += x }
      val bArr = b.result()

      val bais = new ByteArrayInputStream(bArr)
      implicitly[WireFormat[A]].fromWire(new DataInputStream(bais))
    }
    val mf: Manifest[SeqType] = implicitly
  }

  def run() = {
    case class Foo(val value: Int)
    implicit val FooFmt = mkCaseWireFormat(Foo, Foo unapply _)
    val x = DList(Foo(1), Foo(2)).valueToSequenceFile("checkpoint").checkpoint
    val y = x.map(e => Foo(e.value + 1))
    persist(y.toTextFile("plusone"))
  }
}

[WARN] LocalJobRunner - job_local_0001 java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable at HelloWorld$$anon$1.fromWritable(HelloWorld.scala:10) at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.fromKeyValue(SequenceInput.scala:111) at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.fromKeyValue(SequenceInput.scala:110) at com.nicta.scoobi.core.InputConverter$class.asValue(DataSource.scala:51) at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.asValue(SequenceInput.scala:110) at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.map(InputChannel.scala:177) at com.nicta.scoobi.impl.plan.mscr.FloatingInputChannel.map(InputChannel.scala:250) at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62) at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62) at scala.collection.immutable.List.foreach(List.scala:318) at com.nicta.scoobi.impl.mapreducer.MscrMapper.map(MscrMapper.scala:62) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

etorreborre commented 11 years ago

All of these examples are now working on 0.7.0-SNAPSHOT so I'm closing the pull request.

Note however that the syntax has changed a bit since checkpointing is done like that:

val x = DList(Foo(1), Foo(2)).valueToSequenceFile("checkpoint", checkpoint = true)