Constannnnnt / Distributed-CoreNLP

This infrastructure, built on Stanford CoreNLP, MapReduce and Spark with Java, aims at processing documents annotations at large scale.
https://github.com/Constannnnnt/Distributed-CoreNLP
MIT License
0 stars 0 forks source link

Task not serializable Exception #4

Closed KaisongHuang closed 5 years ago

KaisongHuang commented 5 years ago

I think the exception may be caused by nested [map]() operations.

docs.map(doc -> {
            pipeline.annotate(doc);
            return doc.tokens().stream()
                .map(token -> "("+token.word()+","+token.ner()+")").collect(Collectors.joining(" "));
        })
            .saveAsTextFile(_args.output);
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.map(RDD.scala:370)
        at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
        at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
        at ca.uwaterloo.cs651.project.CoreNLP.main(CoreNLP.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: edu.stanford.nlp.pipeline.StanfordCoreNLP
Serialization stack:
        - object not serializable (class: edu.stanford.nlp.pipeline.StanfordCoreNLP, value: edu.stanford.nlp.pipeline.StanfordCoreNLP@225bfe78)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ca.uwaterloo.cs651.project.CoreNLP, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic ca/uwaterloo/cs651/project/CoreNLP.lambda$main$94619a37$1:(Ledu/stanford/nlp/pipeline/StanfordCoreNLP;Ledu/stanford/nlp/pipeline/CoreDocument;)Ljava/lang/String;, instantiatedMethodType=(Ledu/stanford/nlp/pipeline/CoreDocument;)Ljava/lang/String;, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class ca.uwaterloo.cs651.project.CoreNLP$$Lambda$61/2043561063, ca.uwaterloo.cs651.project.CoreNLP$$Lambda$61/2043561063@4ff31e98)
        - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
        - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
        ... 22 more
Constannnnnt commented 5 years ago

Hmmm, I don't know. I may try tomorrow.

ji-xin commented 5 years ago

seems the pipeline object is not serializable. 2 possible solutions:

  1. broadcast pipeline
  2. construct a new pipeline object in the mapper, instead of constructing it ahead (still, prop may need broadcasting)
Constannnnnt commented 5 years ago

I made this change and it seems that the stanfordCoreNLP Object is not serializable. I am not sure whehter it is my problem on the implementation. Take a look at the log.

Broadcast<StanfordCoreNLP> pipelineVar = spark.sparkContext().broadcast(pipeline, scala.reflect.ClassTag$.MODULE$.apply(StanfordCoreNLP.class));

        JavaRDD<String> lines = spark.read().textFile(_args.input).javaRDD();
        JavaRDD<CoreDocument> docs = lines.map(line -> new CoreDocument(line));

        docs.map(doc -> {
            pipelineVar.getValue().annotate(doc);
2018-11-13 10:19:17 WARN  BlockManager:66 - Persisting block broadcast_0 to disk instead.
2018-11-13 10:19:17 WARN  BlockManager:66 - Putting block broadcast_0 failed due to exception java.io.NotSerializableException: edu.stanford.nlp.pipeline.StanfordCoreNLP
Serialization stack:
    - object not serializable (class: edu.stanford.nlp.pipeline.StanfordCoreNLP, value: edu.stanford.nlp.pipeline.StanfordCoreNLP@3412b8a3).
2018-11-13 10:19:17 WARN  BlockManager:66 - Block broadcast_0 could not be removed as it was not found on disk or in memory
Exception in thread "main" java.io.NotSerializableException: edu.stanford.nlp.pipeline.StanfordCoreNLP
Serialization stack:
    - object not serializable (class: edu.stanford.nlp.pipeline.StanfordCoreNLP, value: edu.stanford.nlp.pipeline.StanfordCoreNLP@3412b8a3)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
    at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:174)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1101)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1099)
    at org.apache.spark.storage.DiskStore.put(DiskStore.scala:68)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1099)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:841)
    at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1404)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:123)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1482)
    at ca.uwaterloo.cs651.project.CoreNLP.main(CoreNLP.java:52)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2018-11-13 10:19:17 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2018-11-13 10:19:17 INFO  AbstractConnector:318 - Stopped Spark@53f96742{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}

By the way, here is a preference for tackling this issue

Constannnnnt commented 5 years ago

Solved: broadcast the props and make a pipeline in each mapper. See the output:

(The,O) (University,ORGANIZATION) (of,ORGANIZATION) (Waterloo,ORGANIZATION) (is,O) (located,O) (in,O) (Canada,COUNTRY) (.,O) (Goose,O) (lives,O) (in,O) (this,O) (University,O) (.,O)

I guess we need to reformat the output in the end.