gorillalabs / sparkling

A Clojure library for Apache Spark: fast, fully-features, and developer friendly
https://gorillalabs.github.io/sparkling/
Eclipse Public License 1.0
448 stars 68 forks source link

Kryo serialization error #65

Closed NonaryR closed 6 years ago

NonaryR commented 6 years ago

Hello! I want to use sparkling with deeplearning4j And when I try to configure kryo serialization according to this

(defn make-spark-context []
  (let [c (-> (conf/spark-conf)
              (conf/master "local[*]")
              (conf/app-name "clojure job")
              (conf/set "spark.driver.allowMultipleContexts" "false")
              (conf/set "spark.executor.memory" "4g")
              (conf/set "spark.serializer" "org.apache.spark.serializer.KryoSerializer")
              (conf/set "spark.kryo.registrator" "org.nd4j.Nd4jRegistrator"))]
    (spark/spark-context c)))

I create SQL-dataset and then call

(sql/show data)

And then I have this error

ERROR executor.Executor:91 - Exception in task 0.0 in stage 5.0 (TID 216)
java.lang.UnsupportedOperationException
    at clojure.lang.APersistentVector.add(APersistentVector.java:372)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
    at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:157)
    at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:189)
    at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:186)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
    at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:91)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

UnsupportedOperationException   clojure.lang.APersistentVector.add (APersistentVector.java:372)

How can I fix this?

chrisbetz commented 6 years ago

Hi,

you should have a look at Carbonite (https://github.com/revelytix/carbonite https://github.com/revelytix/carbonite) to register Kryo de/serializers for Clojure data structures.

Short version: You cannot add items to a persistent collection using „add“, that’s the reason for your Exception. Either you use a different datastructure or you provide non-generic-sequence-de/serializers. I’d prefer to go the latter way and use Carbonite.

Cheers,

Chris