twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.48k stars 704 forks source link

Make any converter serializable #1877

Closed dieu closed 5 years ago

dieu commented 5 years ago

Hello,

We found the problem during of testing a new release of Scalding.

Problem

Many sources at Twitter defines custom converters which can capture in closure by accident or nature something non-serializable.

at 0.17.x converters was used as FlatMapFunction (https://github.com/twitter/scalding/blob/0.17.x/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala#L1080) what protects users from any problem with serialization because FlatMapFunction use Externalizer to serialize lambdas.

Solution

Use Externalizer to serialize lambda in TupleConverter

dieu commented 5 years ago

proof of the problem: https://travis-ci.org/twitter/scalding/jobs/449165289

[info] - any converter should be serializable *** FAILED ***
[info]   cascading.flow.planner.PlannerException: could not build flow from assembly: [unable to pack object: cascading.flow.hadoop.HadoopFlowStep]
[info]   at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:578)
[info]   at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:286)
[info]   at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:80)
[info]   at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
[info]   at com.twitter.scalding.ExecutionContext$class.buildFlow(ExecutionContext.scala:84)
[info]   at com.twitter.scalding.ExecutionContext$$anon$1.buildFlow(ExecutionContext.scala:192)
[info]   at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:287)
[info]   at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:287)
[info]   at scala.util.Success.flatMap(Try.scala:231)
[info]   at com.twitter.scalding.Job.buildFlow(Job.scala:287)
[info]   at com.twitter.scalding.Job.run(Job.scala:357)
[info]   at com.twitter.scalding.JobTest.runJob(JobTest.scala:247)
[info]   Cause: java.io.NotSerializableException: com.twitter.scalding.TypedPipeConverterTest$NonSerializableObj
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at cascading.flow.hadoop.util.JavaObjectSerializer.serialize(JavaObjectSerializer.java:57)
[info]   at cascading.flow.hadoop.util.HadoopUtil.serializeBase64(HadoopUtil.java:282)
[info]   at cascading.flow.hadoop.HadoopFlowStep.pack(HadoopFlowStep.java:196)