HCADatalab / powderkeg

Live-coding the cluster!
Eclipse Public License 1.0
159 stars 23 forks source link

Referencing a running thread from the current namespace breaks serialization #15

Closed paul-amperity closed 7 years ago

paul-amperity commented 7 years ago

Repro:

(def myState (atom 0))
(def myThread (Thread. (fn [] (dotimes [i 30] (Thread/sleep 1000) (swap! myState inc)) (println (str "Done " @myState)))))
(into [] (keg/rdd [1 2 3 4 5 6] (map #(* 2 %))))

The result:

ConcurrentModificationException   java.util.Vector$Itr.checkForComodification (Vector.java:1184)
com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (clojure.lang.DynamicClassLoader)
contextClassLoader (java.lang.Thread)
vars (powderkeg.core$barrier_BANG_$fn__11182$fn__11183)
fn (clojure.lang.Delay)
 at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write (FieldSerializer.java:585)
    com.esotericsoftware.kryo.serializers.FieldSerializer.write (FieldSerializer.java:213)
    com.esotericsoftware.kryo.Kryo.writeObject (Kryo.java:501)
    com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write (FieldSerializer.java:564)
    com.esotericsoftware.kryo.serializers.FieldSerializer.write (FieldSerializer.java:213)
    com.esotericsoftware.kryo.Kryo.writeClassAndObject (Kryo.java:568)
    com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write (DefaultArraySerializers.java:318)
    com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write (DefaultArraySerializers.java:293)
    com.esotericsoftware.kryo.Kryo.writeObject (Kryo.java:501)
    com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write (FieldSerializer.java:564)
    com.esotericsoftware.kryo.serializers.FieldSerializer.write (FieldSerializer.java:213)
    com.esotericsoftware.kryo.Kryo.writeObjectOrNull (Kryo.java:549)
    com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write (FieldSerializer.java:570)
    com.esotericsoftware.kryo.serializers.FieldSerializer.write (FieldSerializer.java:213)
    com.esotericsoftware.kryo.Kryo.writeObject (Kryo.java:501)
    com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write (FieldSerializer.java:564)
    com.esotericsoftware.kryo.serializers.FieldSerializer.write (FieldSerializer.java:213)
    com.esotericsoftware.kryo.Kryo.writeClassAndObject (Kryo.java:568)
    carbonite.serializer$print_collection.invoke (serializer.clj:41)
    clojure.lang.Var.invoke (Var.java:388)
    carbonite.ClojureCollSerializer.write (ClojureCollSerializer.java:19)
    com.esotericsoftware.kryo.Kryo.writeClassAndObject (Kryo.java:568)
    carbonite.serializer$write_map.invoke (serializer.clj:69)
    clojure.lang.Var.invoke (Var.java:388)
    carbonite.ClojureMapSerializer.write (ClojureMapSerializer.java:23)
    com.esotericsoftware.kryo.Kryo.writeClassAndObject (Kryo.java:568)
    carbonite.serializer$write_map.invoke (serializer.clj:69)
    clojure.lang.Var.invoke (Var.java:388)
    carbonite.ClojureMapSerializer.write (ClojureMapSerializer.java:23)
    com.esotericsoftware.kryo.Kryo.writeObject (Kryo.java:501)
    com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write (FieldSerializer.java:564)
    com.esotericsoftware.kryo.serializers.FieldSerializer.write (FieldSerializer.java:213)
    com.esotericsoftware.kryo.Kryo.writeObject (Kryo.java:501)
    com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write (FieldSerializer.java:564)
    com.esotericsoftware.kryo.serializers.FieldSerializer.write (FieldSerializer.java:213)
    com.esotericsoftware.kryo.Kryo.writeClassAndObject (Kryo.java:568)
    org.apache.spark.serializer.KryoSerializationStream.writeObject (KryoSerializer.scala:158)
    org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject (TorrentBroadcast.scala:203)
    org.apache.spark.broadcast.TorrentBroadcast.writeBlocks (TorrentBroadcast.scala:102)
    org.apache.spark.broadcast.TorrentBroadcast.<init> (TorrentBroadcast.scala:85)
    org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast (TorrentBroadcastFactory.scala:34)
    org.apache.spark.broadcast.BroadcastManager.newBroadcast (BroadcastManager.scala:63)
    org.apache.spark.SparkContext.broadcast (SparkContext.scala:1327)
    org.apache.spark.api.java.JavaSparkContext.broadcast (JavaSparkContext.scala:648)
    powderkeg.core$barrier_BANG_$fn__11182.invoke (core.clj:131)
cgrand commented 7 years ago

Smaller repro:

(def myThread (Thread. (fn [])))
(keg/rdd nil)

It makes little sense to serialize a Thread (well in this context, in general thread migration is a powerful feature), especially a started one.

Several approaches:

cgrand commented 7 years ago

I went with the "warn and skip" initial approach.