twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

Trying to run scalding 0.17.4 with kryo 2.21 and scala 2.12 #1823

Closed dossett closed 6 years ago

dossett commented 6 years ago

(I realize now that this is probably not possible, but I'm hoping.)

I am trying to upgrade our code base to use scalding 0.17.4 but keep kryo 2.21. I've gotten this to work with scala 2.11, but it falls apart with kryo lamba problems with scala 2.12.

I'm running with the following versions:

The scala 2.12 errors seem related to kryo's issues serializing lamba's in the Java 8 world. I was hoping that twitter/chill#266 meant this would all work in 2.12. Does anyone have a suggestion?

Excerpt of error:

Caused by: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda
Serialization trace:
prep$1 (com.twitter.algebird.Aggregator$$anon$2)
self$2 (com.twitter.algebird.MonoidAggregator$$anon$7)
self$1 (com.twitter.algebird.MonoidAggregator$$anon$6)
capturedArgs (java.lang.invoke.SerializedLambda)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
    at com.twitter.chill.java.ClosureSerializer.read(ClosureSerializer.java:76)
    ... 73 more
Caused by: java.lang.RuntimeException: Could not serialize lambda
    at com.twitter.chill.java.ClosureSerializer.read(ClosureSerializer.java:79)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    ... 88 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.twitter.chill.java.ClosureSerializer.read(ClosureSerializer.java:77)
    ... 90 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
    ... 94 more
Caused by: java.lang.BootstrapMethodError: java.lang.NoSuchMethodError: 
    at com.twitter.algebird.Aggregator$.$deserializeLambda$(Aggregator.scala)
    ... 99 more
Caused by: java.lang.NoSuchMethodError: 
    ... 100 more
johnynek commented 6 years ago

this was added here:

https://github.com/twitter/chill/blob/0.7.7/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L204

I wonder if you have a classpath issue where your classpath when you are deserializing is not what it was where you are serializing. It seems the java8 lambda is being called: com.twitter.chill.java.ClosureSerializer.read(ClosureSerializer.java:79)

Can you try to reproduce this by serializing an aggregator that was made with a lambda?

dossett commented 6 years ago

Thank you @johnynek, that link was very helpful. We're using our own kryo instantiator for historical reasons, and it uses EmptyScalaKryoInstantiator not ScalaKryoInstantiator so we weren't getting the benefit of that change. Registering ClosureSerializer inside a Java8 conditional in our own kyro setup code fixed several kryro problems that I didn't mention above, so that's a step forward.

But the original problem I shared still happens. I found that problem in a scalding job test class, I'm trying to recreate the problem in a real scalding job, but several of those are running fine. I'll keep digging. A more complete stack trace of my original problem is below if that's helpful.

Thank you again for your response and your help!

Caused by: cascading.flow.FlowException: local step failed
    at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:230)
    at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:150)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda
Serialization trace:
fn (com.twitter.scalding.typed.MapFn)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:25)
    at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:19)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at com.twitter.chill.SerDeState.readClassAndObject(SerDeState.java:61)
    at com.twitter.chill.KryoPool.fromBytes(KryoPool.java:94)
    at com.twitter.chill.Externalizer.fromBytes(Externalizer.scala:145)
    at com.twitter.chill.Externalizer.maybeReadJavaKryo(Externalizer.scala:158)
    at com.twitter.chill.Externalizer.readExternal(Externalizer.scala:148)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1849)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at java.util.HashMap.readObject(HashMap.java:1396)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at cascading.flow.hadoop.util.JavaObjectSerializer.deserialize(JavaObjectSerializer.java:101)
    at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:312)
    at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:293)
    at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:81)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:449)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:270)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: java.lang.RuntimeException: Could not serialize lambda
    at com.twitter.chill.java.ClosureSerializer.read(ClosureSerializer.java:79)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    ... 63 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda
Serialization trace:
capturedArgs (java.lang.invoke.SerializedLambda)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
    at com.twitter.chill.java.ClosureSerializer.read(ClosureSerializer.java:76)
    ... 65 more
Caused by: java.lang.RuntimeException: Could not serialize lambda
    at com.twitter.chill.java.ClosureSerializer.read(ClosureSerializer.java:79)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    ... 68 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda
Serialization trace:
prep$1 (com.twitter.algebird.Aggregator$$anon$2)
self$2 (com.twitter.algebird.MonoidAggregator$$anon$7)
self$1 (com.twitter.algebird.MonoidAggregator$$anon$6)
capturedArgs (java.lang.invoke.SerializedLambda)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
    at com.twitter.chill.java.ClosureSerializer.read(ClosureSerializer.java:76)
    ... 73 more
Caused by: java.lang.RuntimeException: Could not serialize lambda
    at com.twitter.chill.java.ClosureSerializer.read(ClosureSerializer.java:79)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    ... 88 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.twitter.chill.java.ClosureSerializer.read(ClosureSerializer.java:77)
    ... 90 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
    ... 94 more
Caused by: java.lang.BootstrapMethodError: java.lang.NoSuchMethodError: 
    at com.twitter.algebird.Aggregator$.$deserializeLambda$(Aggregator.scala)
    ... 99 more
Caused by: java.lang.NoSuchMethodError: 
    ... 100 more
johnynek commented 6 years ago

@dossett that's actually another issue....

We use this: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/serialization/Externalizer.scala to serialize the functions we send to the mappers.

This could be an error with that code not, somehow, having the ClosureSerializer registered.

Yeah, see here: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/serialization/Externalizer.scala#L38

we need to add the closure registrar here: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/serialization/KryoHadoop.scala#L41

but it should be there, since: ScalaKryoInstantiator registers AllScalaRegistrar https://github.com/twitter/chill/blob/0.7.x/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L92

I don't see what went wrong, but if you want to audit what I just said and see if you can see any flaw in my thinking, that would be helpful.

dossett commented 6 years ago

@johnynek our instantiator makes its own registrations (which I have no doubt is the root of all my problems) in a way pretty similar to KryoHadoop that you linked (I think we use a cut/paste from a very old version of scalding). When I added kryo.register(classOf[ClosureSerializer.Closure], new ClosureSerializer) to it, this other problem went away:

Caused by: cascading.flow.stream.DuctException: internal error: ['1', '[LPV(1,598273501,2,1438473600)]']
    at cascading.flow.hadoop.stream.HadoopGroupByGate.receive(HadoopGroupByGate.java:81)
    at cascading.flow.hadoop.stream.HadoopGroupByGate.receive(HadoopGroupByGate.java:37)
    at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
    at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
    at com.twitter.scalding.FlatMapFunction.$anonfun$operate$1(Operations.scala:50)
    at com.twitter.scalding.FlatMapFunction.$anonfun$operate$1$adapted(Operations.scala:48)
    at scala.collection.Iterator.foreach(Iterator.scala:929)
    at scala.collection.Iterator.foreach$(Iterator.scala:929)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
    at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:48)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
    ... 22 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
f$1 (scala.math.Ordering$$anon$5)
$outer (scala.math.Ordering$$anon$4)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at com.etsy.kryo.PriorityQueueSerializer.write(PriorityQueueSerializer.scala:21)
    at com.etsy.kryo.PriorityQueueSerializer.write(PriorityQueueSerializer.scala:11)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486)
    at com.twitter.chill.SerDeState.writeObject(SerDeState.java:63)
    at com.twitter.chill.hadoop.KryoSerializer.serialize(KryoSerializer.java:48)
    at cascading.tuple.hadoop.TupleSerialization$SerializationElementWriter.write(TupleSerialization.java:750)
    at cascading.tuple.io.TupleOutputStream.writeElement(TupleOutputStream.java:114)
    at cascading.tuple.io.TupleOutputStream.write(TupleOutputStream.java:89)
    at cascading.tuple.io.TupleOutputStream.writeTuple(TupleOutputStream.java:64)
    at cascading.tuple.hadoop.io.TupleSerializer.serialize(TupleSerializer.java:37)
    at cascading.tuple.hadoop.io.TupleSerializer.serialize(TupleSerializer.java:28)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1158)
    at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:610)
    at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
    at cascading.flow.hadoop.stream.HadoopGroupByGate.receive(HadoopGroupByGate.java:68)
    ... 33 more
Caused by: java.lang.NullPointerException
    at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
    at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:561)
    ... 53 more

My original problem remains though. Again, I'm sure this is all self-inflicted because we have data laying around serialized with old versions of scalding. Here is the Aggregator that blows up, fwiw.

val aggregator = Aggregator
    .fromMonoid[Map[String, Int]]
    .composePrepare[(String, Map[String, Int])](_._2)
    .andThenPresent {
      _.toSeq
        .sortBy {
          case (nextId, count) => -count
        }
        .take(numPreds)
        .map {
          case (nextId, count) => nextId
        }
        .mkString(" ")
    }

I'm happy to close the issue if you like. Thank you again!