lintool / warcbase

Warcbase is an open-source platform for managing analyzing web archives
http://warcbase.org/
161 stars 47 forks source link

NER3Classifier object is not serializable (NER3Classifier.scala) #167

Closed jrwiebe closed 8 years ago

jrwiebe commented 8 years ago

This issue refers to @aliceranzhou's ner3classifier branch. Also, I'm a novice at Scala and Spark, so forgive me if what I've labelled an issue can be chalked up to my ignorance.

I tried writing a straight port of my extract-entities-from-scrape-text.pig script to test NER3Classifier.scala on the cluster. It failed, it would appear because the NER3Classifier object is not serializable.

I ran the following in the Spark shell:

import org.warcbase.spark.matchbox.NER3Classifier

NER3Classifier.apply("hdfs:///user/jrwiebe/english.all.3class.distsim.crf.ser.gz")
// Same behaviour if local file is specified, i.e., /cliphomes/jrwiebe/...

val entities = sc.textFile("hdfs:///user/jrwiebe/cpp.text-greenparty/part-m-00000")
  .map(_.split("\t"))
  .map(e => (e(0), e(1), NER3Classifier.classify(e(2))))

// For testing, instead of saving to file:
entities.take(3).foreach(println)

I received the following error messages:

WARN  TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, trantor09.umiacs.umd.edu): java.lang.ExceptionInInitializerError: Unable to load classifier java.lang.NullPointerException
    at org.warcbase.spark.matchbox.NER3Classifier$.classify(NER3Classifier.scala:91)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:28)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:28)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
    at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

ERROR YarnScheduler - Lost executor 1 on trantor09.umiacs.umd.edu: remote Akka client disassociated
ERROR YarnScheduler - Lost executor 2 on trantor09.umiacs.umd.edu: remote Akka client disassociated
WARN  TaskSetManager - Lost task 0.2 in stage 0.0 (TID 2, trantor09.umiacs.umd.edu): ExecutorLostFailure (executor 2 lost)
WARN  TaskSetManager - Lost task 0.3 in stage 0.0 (TID 3, trantor12.umiacs.umd.edu): java.lang.ExceptionInInitializerError: Unable to load classifier java.lang.NullPointerException
    at org.warcbase.spark.matchbox.NER3Classifier$.classify(NER3Classifier.scala:91)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:28)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:28)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
    at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

ERROR TaskSetManager - Task 0 in stage 0.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, trantor12.umiacs.umd.edu): java.lang.ExceptionInInitializerError: Unable to load classifier java.lang.NullPointerException
    at org.warcbase.spark.matchbox.NER3Classifier$.classify(NER3Classifier.scala:91)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:28)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:28)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
    at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

I eventually concluded that the problem was with evaluation of the second map closure. This seemed to be confirmed when I pasted NER3Classifier.scala (minus the package declaration) into the Spark shell, replacing the import statement in my script. In this instance a "Task not serializable" exception was thrown instead of a NullPointerException. (I assume the difference is due to the information available to the interpreter, as opposed to what the bytecode produced by the compiler.)

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1628)
    at org.apache.spark.rdd.RDD.map(RDD.scala:286)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:120)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:141)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:143)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:145)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:147)
    at $iwC$$iwC$$iwC.<init>(<console>:149)
    at $iwC$$iwC.<init>(<console>:151)
    at $iwC.<init>(<console>:153)
    at <init>(<console>:155)
    at .<init>(<console>:159)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:824)
    at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:344)
    at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:344)
    at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
    at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
    at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:808)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$NER3Classifier$
Serialization stack:
    - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$NER3Classifier$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$NER3Classifier$@18cdacd)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: NER3Classifier$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$NER3Classifier$)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@734db023)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 53 more

I tried modifying NER3Classifier, but I didn't get anywhere. It's not even clear to me that I'm taking the right approach. It seems to me that rather than the driver creating the NER3Classifier object, loading the classifier file, and somehow getting serialized and then passed to the worker nodes, each worker node should rather get a function that would create the NER3Classifier object 'locally'. I tried this by passing the map a function, but I got the same "Task not serializable" error.

aliceranzhou commented 8 years ago

That makes sense.

If NER3Classifier were an object, we could pass the classifier file path to every instance of the classifier method. But it would be loaded many times then..

I've changed the NER3Classifier from an object to a class to be passed around. The driver 'creates' the NER3Classifier class with a specific classifier, but the classifier file is loaded 'locally' by worker nodes (when classifier.classify() is first called). Would this work?

Try this?

import org.warcbase.spark.matchbox.NER3Classifier

val classifier = new NER3Classifier("hdfs:///user/jrwiebe/english.all.3class.distsim.crf.ser.gz")

val entities = sc.textFile("hdfs:///user/jrwiebe/cpp.text-greenparty/part-m-00000")
  .map(_.split("\t"))
  .map(e => (e(0), e(1), classifier.classify(e(2))))

val entities = rdd.map(r => (r._1, r._2, classifier_.classify(r._3)))
entities.take(3).foreach(println)
jrwiebe commented 8 years ago

I also tried this yesterday, making NER3Classifier a class extending Serializable -- although I didn't think to move the NERClassType object definition and the ObjectMapper instantiation. It still isn't working for me, though. Running the above code (minus the second-last line), I'm still getting an error loading the classifier. You'll notice it's a FileNotFoundException, although the file is definitely where it's specified.

WARN  TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, trantor04.umiacs.umd.edu): java.lang.ExceptionInInitializerError: Unable to load classifier java.io.FileNotFoundException: hdfs:/user/jrwiebe/english.all.3class.distsim.crf.ser.gz (No such file or directory)
    at org.warcbase.spark.matchbox.NER3Classifier.classify(NER3Classifier.scala:78)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:27)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:27)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
    at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

        ...
aliceranzhou commented 8 years ago

Hmm.. that's really odd. It works for me, although the classifier is local, and not on HDFS. Does CRFClassifier.getClassifier("file") also return the FileNotFoundException?

aliceranzhou commented 8 years ago

Also, my bad about the second-last line. Need to copy/paste more carefully.

jrwiebe commented 8 years ago

Sorry for dragging you into this, Alice. Your original code worked fine; the issue was with my code, and not understanding how the workers handle files. Instead of trying to make the workers load the classifier from "hdfs://...", the thing to do is to use sc.addFile().

So, the following works, using your earlier code where NER3Classifier is an object (cc274ed):

sc.addFile("hdfs:///user/jrwiebe/english.all.3class.distsim.crf.ser.gz")

val entities = sc.textFile("hdfs:///user/jrwiebe/cpp.text-greenparty/part-m-00000")
  .map(_.split("\t"))
  .map{ e =>
    NER3Classifier.apply("english.all.3class.distsim.crf.ser.gz")
    (e(0), e(1), NER3Classifier.classify(e(2)))
  }

Better, since we don't want to initialize the classifier for every string, is this:

sc.addFile("hdfs:///user/jrwiebe/english.all.3class.distsim.crf.ser.gz")

val entities = sc.textFile("hdfs:///user/jrwiebe/cpp.text-greenparty/part-m-00000")
  .mapPartitions(iter => {
    NER3Classifier.apply("english.all.3class.distsim.crf.ser.gz")
    iter.map(_.split("\t"))
      .map(e =>  (e(0), e(1), NER3Classifier.classify(e(2))))
  })

Without modification, the updated class version of NER3Classifier (582b21a) doesn't work with mapPartitions(), though it does work with straight map(). I suggest we revert to the object version.

lintool commented 8 years ago

@jrwiebe Dunno if this helps, but the reason why your original code didn't work was this: in your original code, you initialized something in the shell, which is local, and then you referred to it inside map, which runs in parallel in each executor (possibly remotely over a large cluster). The only way that could possible work is if Spark could "package up" the NER val and send to the executors, but it didn't know how...

It was fixed in the later code because you had the NER initialized from within the closure inside the map. Yes, this means that each executor has to separately initialize the NER, but there's no way around that --- remember, in principle, the map could be running on hundreds of nodes... make sense?

jrwiebe commented 8 years ago

Thanks, @lintool. I do understand all of this now, and that was my basic understanding when I opened this issue, but I couldn't figure out how to serialize the NER classifier, and I couldn't manage to initialize a classifier within the closure either (the problem here being trying to access an hdfs-path).

For my part, I think we can close this issue. You can ignore my comment above about the class version of NER3Classifier not working. I just tested it again, and it works fine. I wonder if I polluted the namespace in the REPL by testing first with the NER3Classifier object and then loading the class definition. Anyway, this works with the current (class) code:

import org.warcbase.spark.matchbox.NER3Classifier

sc.addFile("hdfs:///user/jrwiebe/english.all.3class.distsim.crf.ser.gz")

val entities = sc.textFile("hdfs:///user/jrwiebe/cpp.text-greenparty/part-m-00000")
  .mapPartitions(iter => {
    val classifier = new NER3Classifier("english.all.3class.distsim.crf.ser.gz")
    iter.map(_.split("\t"))
      .map(e =>  (e(0), e(1), classifier.classify(e(2))))    
  })
lintool commented 8 years ago

@jrwiebe Can you please document this knowledge appropriately in the wiki?

Closing this issue.

jrwiebe commented 8 years ago

Done.

On Sun, Nov 22, 2015 at 3:09 PM, Jimmy Lin notifications@github.com wrote:

@jrwiebe https://github.com/jrwiebe Can you please document this knowledge appropriately in the wiki?

Closing this issue.

— Reply to this email directly or view it on GitHub https://github.com/lintool/warcbase/issues/167#issuecomment-158794331.