twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

Not serializable function in `MapGroup` #1795

Closed fwbrasil closed 6 years ago

fwbrasil commented 6 years ago

I'm investigating this error in one of our e2e tests. The exception happens when running with the latest develop

Caused by: cascading.flow.planner.PlannerException: could not build flow from assembly: [Neither Java nor Kyro works for class: class com.twitter.scalding.typed.CoGrouped$MapGroup$$anonfun$joinFunction$2 instance: <function3>
export CHILL_EXTERNALIZER_DEBUG=true to see both stack traces]
    at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:578)
    at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:286)
    at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:80)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
    at com.twitter.scalding_internal.job.FileSourceWarningFlowConnector.connect(FileSourceWarningFlowConnector.scala:42)
    at com.twitter.scalding.ExecutionContext$class.buildFlow(ExecutionContext.scala:75)
    at com.twitter.scalding.ExecutionContext$$anon$1.buildFlow(ExecutionContext.scala:168)
    at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:231)
    at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:231)
    at scala.util.Success.flatMap(Try.scala:231)
    at com.twitter.scalding.Job.buildFlow(Job.scala:231)
    at com.twitter.scalding_internal.job.TwitterJob.buildFlow(Common.scala:84)
    at com.twitter.scalding.Job.run(Job.scala:301)
    at com.twitter.scalding.Tool.start$1(Tool.scala:124)
    at com.twitter.scalding.Tool.run(Tool.scala:141)
    at com.twitter.scalding.Tool.run(Tool.scala:68)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.twitter.scalding.Tool$.main(Tool.scala:149)
    ... 7 more
johnynek commented 6 years ago

This looks like a regression on using Externalizer on functions, or at least I hope that is it.

Serialization problems are the worst.

johnynek commented 6 years ago

ahh, I see this issue...

Grouped.addEmptyGuard(fn) needs to happen outside the closure:

val guardedFn = Grouped.addEmptyGuard(fn)

{ (k: K, ....)

@fwbrasil can you take this fix and see if it fixes this issue?

johnynek commented 6 years ago

https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala#L243

is the line I am talking about above.

fwbrasil commented 6 years ago

@johnynek not sure I understand. It seems that it wouldn't change what's being serialized?

johnynek commented 6 years ago

@fwbrasil scala will not just capture the fn. It captures the whole this to get this.fn. Everything that this can reach needs to be serialized.

That is why we have code that captures a local val, which does not need the this before putting it in the closure.

This is related to the problem https://github.com/scalacenter/spores?files=1 was addressing: make the captured vals explicit.

johnynek commented 6 years ago

maybe make the abstract classes here: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala#L54

extend java.io.Serializable also, but I think all the subclasses are case classes, which I thought extended serializable. Good to be explicit probably.

fwbrasil commented 6 years ago

@johnynek yep, I'm well aware of capturing and serialization issues :)

extend java.io.Serializable also, but I think all the subclasses are case classes, which I thought extended serializable. Good to be explicit probably.

Case classes extend Serializable. That's why I asked; the capturing should be ok. I'll test the change, though

johnynek commented 6 years ago

I just hit this in the optimizer scalachecks... seems it is not fixed...

[info] - all optimization rules do not increase steps *** FAILED ***
[info]   PlannerException was thrown during property evaluation.
[info]     Message: could not build flow from assembly: [Neither Java nor Kyro works for class: class com.twitter.scalding.typed.CoGrouped$MapGroup$$anonfun$j
oinFunction$2 instance: <function3>
[info]   export CHILL_EXTERNALIZER_DEBUG=true to see both stack traces]
[info]     Occurred when passed generated values (
[info]       arg0 = WithDescriptionTypedPipe(Mapped(ReduceStepPipe(ValueSortedReduce(scala.math.Ordering$Int$@47d2fa19,WithDescriptionTypedPipe(CoGroupedPipe(
MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@47d2fa19,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapValues(WithDescriptionTypedPipe(FlatMa
pValues(CoGroupedPipe(MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@47d2fa19,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(WithDescript
ionTypedPipe(ForceToDisk(WithDescriptionTypedPipe(TrappedPipe(IterablePipe(List(-2044394654, -2147483648, -1165444973, -1, -793644142, 1560289238, 1068980366,
 1104433522)),com.twitter.scalding.source.FixedTypedText(ybiMb8wxvGwc7sizijn1R09uh7LcioaybMbbhDixugkae),Single(com.twitter.scalding.TupleGetter$IntGetter$@4fc
fc89e)),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true)))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalac
heck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),IdentityReduce(scala.
math.Ordering$Int$@47d2fa19,WithDescriptionTypedPipe(ReduceStepPipe(ValueSortedReduce(scala.math.Ordering$Int$@47d2fa19,WithDescriptionTypedPipe(WithDescripti
onTypedPipe(FlatMapped(IterablePipe(List(-1, 2147483647, 206095538, -1, 1, -2147483648, 1255319995, 1647024036)),<function1>),List((org.scalacheck.Gen$R$class
.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),scala.math.Ordering$Int$@47d2fa19,<function2>,None,List())),List((org.
scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),<function2>)),<function1>),List((org.scalacheck.Gen$R$class.ma
p(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),
None,List(),ReflexiveEquality()),IdentityReduce(scala.math.Ordering$Int$@47d2fa19,WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionType
dPipe(Filter(WithDescriptionTypedPipe(Mapped(ReduceStepPipe(IteratorMappedReduce(scala.math.Ordering$Int$@47d2fa19,WithDescriptionTypedPipe(WithDescriptionTyp
edPipe(FlatMapped(IterablePipe(List(-1129517037, -1434469965, -323049906, -1707414621, -1, -357391866, 1553216956, 1)),<function1>),List((org.scalacheck.Gen$R
$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function2>,None,List())),<function1>),List((org.scalacheck.Gen$
R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(
Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),<function2>)),List((org.scala
check.Gen$R$class.map(Gen.scala:237),true))),scala.math.Ordering$Int$@47d2fa19,<function2>,Some(2),List())),<function1>),List((org.scalacheck.Gen$R$class.map(
Gen.scala:237),true))),
[info]       arg1 = com.twitter.scalding.typed.OptimizationRules$ComposeDescriptions$@2f94464d.orElse(com.twitter.scalding.typed.OptimizationRules$ComposeFlat
Map$@11992172).orElse(com.twitter.scalding.typed.OptimizationRules$EmptyIterableIsEmpty$@4ee02952).orElse(com.twitter.scalding.typed.OptimizationRules$FilterL
ocally$@77e53be0).orElse(com.twitter.scalding.typed.OptimizationRules$FilterKeysEarly$@7acebd7f).orElse(com.twitter.scalding.typed.OptimizationRules$ComposeMa
pFlatMap$@228481e5).orElse(com.twitter.scalding.typed.OptimizationRules$IgnoreNoOpGroup$@6de1965e).orElse(com.twitter.scalding.typed.OptimizationRules$MapValu
esInReducers$@33858a39)
[info]     )
johnynek commented 6 years ago

I am trying to re-run with export CHILL_EXTERNALIZER_DEBUG=true did you try this @fwbrasil ?

johnynek commented 6 years ago

I wonder actually if this was capturing the outer object CoGrouped which is not serializable.

johnynek commented 6 years ago

Here is another example of this, so it seems we have not totally solved this problem.

I think one issue is that our generated functions may not be java serializable, so if kryo gets tripped up, we get into trouble. Not 100% clear.

[info] - all optimization rules do not increase steps *** FAILED ***
[info]   PlannerException was thrown during property evaluation.
[info]     Message: could not build flow from assembly: [Neither Java nor Kyro works for class: class com.twitter.scalding.typed.CoGrouped$MapGroup$$anonfun$joinFunction$2 instance: <function3>
[info]   export CHILL_EXTERNALIZER_DEBUG=true to see both stack traces]
[info]     Occurred when passed generated values (
[info]       arg0 = WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(MapValues(WithDescriptionTypedPipe(HashCoGroup(WithDescriptionTypedPipe(Mapped(CoGroupedPipe(Pair(IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(IterablePipe(List(-1165334872, 2099230459, 610633690, 0, 1, 2147483647, 0)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(ReduceStepPipe(ValueSortedReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(WithDescriptionTypedPipe(Mapped(CoGroupedPipe(MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(IterablePipe(List(2147483647, 31196373, 1221718517, 0, -1, -1, 962121930)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapValues(WithDescriptionTypedPipe(MapValues(CoGroupedPipe(MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(MapValues(WithDescriptionTypedPipe(HashCoGroup(WithDescriptionTypedPipe(SumByLocalKeys(WithDescriptionTypedPipe(MapValues(WithDescriptionTypedPipe(HashCoGroup(WithDescriptionTypedPipe(FlatMapValues(WithDescriptionTypedPipe(FlatMapped(WithDescriptionTypedPipe(FlatMapped(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(Mapped(IterablePipe(List(-1, -1, 1, -2147483648, -50407583, 339509677, -404679436)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(MergedTypedPipe(WithDescriptionTypedPipe(MergedTypedPipe(IterablePipe(List(1323985236, -7696337, 0, 2147483647, -2147483648, 1, -1)),WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(Mapped(IterablePipe(List(0, 1, 1077523378, -1609197872, -1, -1, -2036937701)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true)))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),WithDescriptionTypedPipe(Filter(IterablePipe(List(1, 1, 0, -1423447448, -831937110, -808651958, 65328076)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true)))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),com.twitter.algebird.IntRing$@c75c9c0),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(SumByLocalKeys(WithDescriptionTypedPipe(FlatMapped(IterablePipe(List(-382812877, -1, -2147483648, 1, -98814158, -1, -826547513)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),com.twitter.algebird.IntRing$@c75c9c0),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(IterablePipe(List(1078161448, -2147483648, -672365582, -2147483648, -1372460504, 652942645, 1)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),<function2>)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),<function2>)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),scala.math.Ordering$Int$@9632adf,<function2>,Some(2),List())),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(CoGroupedPipe(Pair(IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(FlatMapped(IterablePipe(List(-78693237, 1587237348, 1, 115653791, -1350941060, -2147483648, 1140492351)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),IdentityReduce(scala.math.Ordering$Int$@9632adf,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(IterablePipe(List(1, 0, 960119333, 1098974756, 1, 0, -669565811)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),
[info]       arg1 = com.twitter.scalding.typed.OptimizationRules$FilterLocally$@51d9f42b.orElse(com.twitter.scalding.typed.OptimizationRules$EmptyIterableIsEmpty$@46e9533f).orElse(com.twitter.scalding.typed.OptimizationRules$DiamondToFlatMap$@5c20a583).orElse(com.twitter.scalding.typed.OptimizationRules$MapValuesInReducers$@dc1d38d).orElse(com.twitter.scalding.typed.OptimizationRules$RemoveDuplicateForceFork$@132716b2).orElse(com.twitter.scalding.typed.OptimizationRules$IgnoreNoOpGroup$@1fdbcf42).orElse(com.twitter.scalding.typed.OptimizationRules$ComposeFilterFlatMap$@468e6d87).orElse(com.twitter.scalding.typed.OptimizationRules$ComposeFilterMap$@45509360)
[info]     )
johnynek commented 6 years ago

Hit this again.

[info] - all optimization rules do not increase steps *** FAILED ***
[info]   PlannerException was thrown during property evaluation.
[info]     Message: could not build flow from assembly: [Neither Java nor Kyro works for class: class com.twitter.scalding.typed.CoGrouped$MapGroup$$anonfun$joinFunction$2 instance: <function3>
[info]   export CHILL_EXTERNALIZER_DEBUG=true to see both stack traces]
[info]     Occurred when passed generated values (
[info]       arg0 = WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(MergedTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(MapValues(WithDescriptionTypedPipe(HashCoGroup(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(MapValues(WithDescriptionTypedPipe(FlatMapValues(CoGroupedPipe(MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@3167ab1,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(IterablePipe(List(-1, 1, 0, -657430502, 0, 1, 0)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),IdentityReduce(scala.math.Ordering$Int$@3167ab1,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FilterKeys(ReduceStepPipe(ValueSortedReduce(scala.math.Ordering$Int$@3167ab1,WithDescriptionTypedPipe(WithDescriptionTypedPipe(MapValues(WithDescriptionTypedPipe(HashCoGroup(WithDescriptionTypedPipe(FlatMapped(IterablePipe(List(-172288943, -1, -1729623253, -1049932911, -1295220109, -2147483648, 565817574)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),IdentityReduce(scala.math.Ordering$Int$@3167ab1,WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(MergedTypedPipe(IterablePipe(List(-650547163, 804205182, 1890497775, 1829194738, 35236403, -2147483648, 471148486)),WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(SumByLocalKeys(WithDescriptionTypedPipe(FlatMapped(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(Filter(WithDescriptionTypedPipe(Filter(IterablePipe(List(-1, -1937948343, -1099793085, -1, 306661242, 0, 433633707)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),com.twitter.algebird.IntRing$@45efd816),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true)))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),scala.math.Ordering$Int$@3167ab1,<function2>,None,List())),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),<function2>)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),IdentityReduce(scala.math.Ordering$Int$@3167ab1,WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(IterablePipe(List(-206467820, 2147483647, 529587044, -641163189, 0, -1, 2147483647)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),IterablePipe(List(-1642049225, -330435452, 2147483647, -635864124, 632303618, 1, -2147483648))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),
[info]       arg1 = com.twitter.scalding.typed.OptimizationRules$FilterKeysEarly$@467fde6a.orElse(com.twitter.scalding.typed.OptimizationRules$ComposeFlatMap$@7e05c10e).orElse(com.twitter.scalding.typed.OptimizationRules$ComposeMap$@3c8e17b).orElse(com.twitter.scalding.typed.OptimizationRules$ComposeFilter$@7bb0d8d).orElse(com.twitter.scalding.typed.OptimizationRules$EmptyIterableIsEmpty$@3f0f1f9e).orElse(com.twitter.scalding.typed.OptimizationRules$DeferMerge$@6f52b4c0).orElse(com.twitter.scalding.typed.OptimizationRules$DiamondToFlatMap$@2b555368).orElse(com.twitter.scalding.typed.OptimizationRules$MapValuesInReducers$@74dde4ea)
[info]     )

Will try to replace the inner function with a class and see what the problem is maybe.

I have an idea that the issue is that as we compose more and more, a final function is less and less likely to be totally serializable by either java serialization or kryo. Since one or the other has to work for the whole thing.

We could improve Externalizer to recursively on each field try both kryo or java, then you can have parts serializable by java, parts by kryo. I don't think that would be very hard actually if we make a slow default kryo serialization (fine for planning) that tries both rather than making the default the fields serializer. So: this new default would try to serialize and deserialize with java, if no exception happens, great. Otherwise, it tries to recurse on the object using kryo's field serializer.

johnynek commented 6 years ago

Here is is again. Seems it is always: com.twitter.scalding.typed.CoGrouped$MapGroup$$anonfun$joinFunction$2 instance: <function3>. I'm not sure what it is about that. Going to take another look.

[info] - When we break at forks we have at most 2 + hashJoin steps *** FAILED ***
[info]   PlannerException was thrown during property evaluation.
[info]     Message: could not build flow from assembly: [Neither Java nor Kyro works for class: class com.twitter.scalding.typed.CoGrouped$MapGroup$$anonfun$joinFunction$2 instance: <function3>
[info]   export CHILL_EXTERNALIZER_DEBUG=true to see both stack traces]
[info]     Occurred when passed generated values (
[info]       arg0 = WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(FlatMapValues(CoGroupedPipe(MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@79b32c0,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(IterablePipe(List(-241297074)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),IdentityReduce(scala.math.Ordering$Int$@79b32c0,WithDescriptionTypedPipe(WithDescriptionTypedPipe(MapValues(CoGroupedPipe(MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@79b32c0,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(WithDescriptionTypedPipe(MergedTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(FlatMapValues(WithDescriptionTypedPipe(FlatMapped(SourcePipe(com.twitter.scalding.source.FixedTypedText(milp9mifxkrvj17skuc)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),WithDescriptionTypedPipe(MergedTypedPipe(WithDescriptionTypedPipe(ForceToDisk(WithDescriptionTypedPipe(MergedTypedPipe(EmptyTypedPipe,WithDescriptionTypedPipe(ForceToDisk(WithDescriptionTypedPipe(FlatMapped(IterablePipe(List(-322094959, -1, 1, 1, -1, 198263043, -1, 1549725949, -276989060)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true)))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true)))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true)))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),IterablePipe(List(1560598188, 143359154))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true)))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),IdentityReduce(scala.math.Ordering$Int$@79b32c0,WithDescriptionTypedPipe(ReduceStepPipe(ValueSortedReduce(scala.math.Ordering$Int$@79b32c0,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FilterKeys(WithDescriptionTypedPipe(FlatMapped(SourcePipe(com.twitter.scalding.source.FixedTypedText(trI77wnhWoccbnwquaj)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),scala.math.Ordering$Int$@79b32c0,<function2>,None,List())),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),<function2>)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),None,List(),ReflexiveEquality()),<function3>),<function2>)),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true))),<function1>),List((org.scalacheck.Gen$R$class.map(Gen.scala:237),true)))
[info]     )
johnynek commented 6 years ago

I tried the robust serializer idea here:

https://github.com/twitter/scalding/tree/oscar/robust-serializer

but it didn't work. It is causing null pointer exceptions when enabled. Along the way, I did find that the closure for CoGrouped.Pair is incorrect and capturing the whole graph. That could be the issue, so I'll make a PR for that change.