twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

Merge develop into cascading3 #1776

Open johnynek opened 6 years ago

johnynek commented 6 years ago

make sure the cascading branch is up to date with develop

cc @rubanm @fwbrasil

CLAassistant commented 6 years ago

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
5 out of 10 committers have signed the CLA.

:white_check_mark: johnynek
:white_check_mark: ianoc
:white_check_mark: moulimukherjee
:white_check_mark: erik-stripe
:white_check_mark: alexeygorobets
:x: Tom Dyas
:x: ianoc-stripe
:x: snoble
:x: FlavSF
:x: fwbrasil


Tom Dyas seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

johnynek commented 6 years ago

I'm going to merge when this goes green since all the PRs have already been merged to develop.

johnynek commented 6 years ago

This seems like a real failure of some of the random planning:

[info] - all optimization rules do not increase steps *** FAILED ***
[info]   PlannerException was thrown during property evaluation.
[info]     Message: union of pipelines have 8 fewer elements than parent node: MapReduceHadoopRuleRegistry, missing: [[apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> HashJoin(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[by: _pipe_2227-2226:[{1}:'key'] IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e:[{1}:'key1']], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[FlatMapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [cascading.flow.hadoop.planner.HadoopPlanner.makeTempTap(HadoopPlanner.java:232)] -> TempHfs["SequenceFile[['key', 'value']]"][5988349142/_pipe_2227-2226/], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:'key1', 'value1']], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:'key1', 'value1']]]
[info]     Occurred when passed generated values (
[info]       arg0 = WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(MapValues(CoGroupedPipe(MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@6f790d18,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(EmptyTypedPipe,<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),None,List()),IdentityReduce(scala.math.Ordering$Int$@6f790d18,WithDescriptionTypedPipe(CoGroupedPipe(MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@6f790d18,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(WithDescriptionTypedPipe(EmptyTypedPipe,tvo3aakgrh9jrzxoyeuqnfawbmjnxhaixoNgomuxeg41zfcpu,false),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),None,List()),IdentityReduce(scala.math.Ordering$Int$@6f790d18,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(WithDescriptionTypedPipe(MergedTypedPipe(WithDescriptionTypedPipe(WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossValue(WithDescriptionTypedPipe(TrappedPipe(EmptyTypedPipe,com.twitter.scalding.source.FixedTypedText(m8x5mxgwljgg4zWaq),com.twitter.scalding.LowPriorityTupleConverters$$anon$3@3b6d7f73),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),LiteralValue(2)),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),EmptyTypedPipe),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),pqbttw,false),rzeykwyetbqpay9k7kmyfqrihXolLbo1gkqhq,false),EmptyTypedPipe),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),None,List()),<function3>),<function2>)),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),None,List()),<function3>),<function2>)),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),
[info]       arg1 = com.twitter.scalding.typed.OptimizationRules$EmptyIsOftenNoOp$@16b4d0e9.orElse(com.twitter.scalding.typed.OptimizationRules$ComposeMap$@3205734f).orElse(com.twitter.scalding.typed.OptimizationRules$ComposeFilter$@649ffdcc).orElse(com.twitter.scalding.typed.OptimizationRules$EmptyIterableIsEmpty$@401dea94).orElse(com.twitter.scalding.typed.OptimizationRules$RemoveDuplicateForceFork$@709e59e6).orElse(com.twitter.scalding.typed.OptimizationRules$IgnoreNoOpGroup$@6918c3b1).orElse(com.twitter.scalding.typed.OptimizationRules$ComposeFilterMap$@72f222c3).orElse(com.twitter.scalding.typed.OptimizationRules$FilterKeysEarly$@5d943f43)
[info]     )
[info]   org.scalatest.exceptions.GeneratorDrivenPropertyCheckFailedException:
[info]   at org.scalatest.enablers.CheckerAsserting$$anon$2.indicateFailure(CheckerAsserting.scala:223)
[info]   at org.scalatest.enablers.CheckerAsserting$$anon$2.indicateFailure(CheckerAsserting.scala:219)
[info]   at org.scalatest.enablers.UnitCheckerAsserting$CheckerAssertingImpl.check(CheckerAsserting.scala:140)
[info]   at org.scalatest.prop.GeneratorDrivenPropertyChecks$class.forAll(GeneratorDrivenPropertyChecks.scala:1136)
[info]   at com.twitter.scalding.typed.OptimizationRulesTest.forAll(OptimizationRulesTest.scala:198)
[info]   at com.twitter.scalding.typed.OptimizationRulesTest$$anonfun$35.apply(OptimizationRulesTest.scala:258)
[info]   at com.twitter.scalding.typed.OptimizationRulesTest$$anonfun$35.apply(OptimizationRulesTest.scala:24

[info]   Cause: cascading.flow.planner.PlannerException: union of pipelines have 8 fewer elements than parent node: MapReduceHadoopRuleRegistry, missing: [[apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> HashJoin(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[by: _pipe_2227-2226:[{1}:'key'] IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e:[{1}:'key1']], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[FlatMapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [cascading.flow.hadoop.planner.HadoopPlanner.makeTempTap(HadoopPlanner.java:232)] -> TempHfs["SequenceFile[['key', 'value']]"][5988349142/_pipe_2227-2226/], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:'key1', 'value1']], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:'key1', 'value1']]]
[info]   at cascading.flow.planner.FlowPlanner.verifyResultInternal(FlowPlanner.java:703)
[info]   at cascading.flow.planner.FlowPlanner.verifyResult(FlowPlanner.java:565)
[info]   at cascading.flow.planner.rule.RuleSetExec.execPlannerFor(RuleSetExec.java:163)
[info]   at cascading.flow.planner.rule.RuleSetExec$3.call(RuleSetExec.java:336)
[info]   at cascading.flow.planner.rule.RuleSetExec$3.call(RuleSetExec.java:328)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
johnynek commented 6 years ago

cc @cwensel

johnynek commented 6 years ago

here is another one: " Message: registry: MapReduceHadoopRuleRegistry, phase: PostPipelines, failed on rule: RemoveMalformedHashJoinPipelineTransformer, see attached source element-graph"

I guess this PR is trying to merge the scalacheck code that randomly generates graphs and tries to plan them and we are seeing it fail.

I think this probably means cascading has some rules that can't plan some of these graphs.

It would be nice to know what the limitations are in cascading to we can transform them in our rules.

@piyushnarang @rubanm any ideas? you both have added some work arounds.

piyushnarang commented 6 years ago

@johnynek - do you know what graph it fails on? We had to work through a few job failures on the planning phase as the cascading3 planner didn't like the graph (even though the same graph worked fine on cascading2). This was one of our concerns also about merging the branch at twitter as we were afraid of having internal users that hadn't tried cascading3 on their jobs running into these issues.

johnynek commented 6 years ago

@piyushnarang these were randomly generated. The toString is in the message, but it is pretty huge, I don't want to paste it here. The error seems like it is a self hashjoin. I'll try to write a more direct test to trigger it. If that's it, we can fix it with an optimizer rule.

johnynek commented 6 years ago

okay, here is another failure in the current state:

arg0 = WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossValue(WithDescriptionTypedPipe(MergedTypedPipe(IterablePipe(List(-2147483648, -642344067)),WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossValue(WithDescriptionTypedPipe(MergedTypedPipe(WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossPipe(WithDescriptionTypedPipe(TrappedPipe(IterablePipe(List(312548152, 458345207)),com.twitter.scalding.source.FixedTypedText(ked),com.twitter.scalding.LowPriorityTupleConverters$$anon$3@6064ee0),org.scalacheck.Gen$R.map(Gen.scala:237),true),IterablePipe(List(-2147483648, 0))),org.scalacheck.Gen$R.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R.map(Gen.scala:237),true),cpq6jceulrzEgHkvjvnEpxngbsenkccrAzZiu2eanNk,false),WithDescriptionTypedPipe(WithDescriptionTypedPipe(Filter(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossPipe(IterablePipe(List(-151163257, -2147483648)),IterablePipe(List(-1992242190, -1163143704))),org.scalacheck.Gen$R.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R.map(Gen.scala:237),true),org.scalacheck.GenArities$$Lambda$441/1771876489@56f9ef3e),org.scalacheck.Gen$R.map(Gen.scala:237),true),hy0zwcjzomxTl7Prkmgcs1Chmsmcxtfyfgfpiothasorzoz0hxiygwznwia,false)),org.scalacheck.Gen$R.map(Gen.scala:237),true),LiteralValue(2)),org.scalacheck.Gen$R.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R.map(Gen.scala:237),true)),org.scalacheck.Gen$R.map(Gen.scala:237),true),LiteralValue(2)),org.scalacheck.Gen$R.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R.map(Gen.scala:237),true),
[info]       arg1 = com.twitter.scalding.typed.OptimizationRules$EmptyIterableIsEmpty$@11f2def

with the cascading stack of:

[info]   Cause: cascading.flow.planner.PlannerException: registry: MapReduceHadoopRuleRegistry, phase: PostPipelines, failed on rule: RemoveMalformedHashJoinPipelineTransformer, see attached source element-graph
[info]   at cascading.flow.planner.rule.RuleExec.performTransform(RuleExec.java:422)
[info]   at cascading.flow.planner.rule.RuleExec.performMutation(RuleExec.java:226)
[info]   at cascading.flow.planner.rule.RuleExec.executeRulePhase(RuleExec.java:178)
[info]   at cascading.flow.planner.rule.RuleExec.planPhases(RuleExec.java:125)
[info]   at cascading.flow.planner.rule.RuleExec.exec(RuleExec.java:86)
[info] m  at cascading.flow.planner.rule.RuleSetExec.execPlannerFor(RuleSetExec.java:153)
[info]   at cascading.flow.planner.rule.RuleSetExec$3.call(RuleSetExec.java:336)
[info]   at cascading.flow.planner.rule.RuleSetExec$3.call(RuleSetExec.java:328)
johnynek commented 6 years ago

@piyushnarang okay here is a test that is still currently failing even with the hashjoin checkpoints we have added. here is the cascading debug output: https://www.dropbox.com/s/6o0s4uoxmpe9eyp/286EB2.tgz?dl=0

You can see the scala code in the PR. I will try to minimize it down a bit and see if it still fails.

cc @cwensel

johnynek commented 6 years ago

Okay, I simplified the failing case and it indeed looks related to merges and hashjoins. We had some code to try to protect against this but it looks like it is not working here.

Here is the cascading log output: https://www.dropbox.com/s/7qyc4a9pxtstwio/E552D2.tgz?dl=0 and here is a rendered dot of the input as cascading sees it: https://www.dropbox.com/s/iffadh9x7unrg5w/01-BalanceAssembly-init.dot.png?dl=0

as you can see, it is actually not a very complex job that causes the failure:

      val p1 =
        TypedPipe.from(List(1, 2))
          .cross(TypedPipe.from(List(3, 4)))

      val p2 =
        TypedPipe.from(List(5, 6))
          .cross(TypedPipe.from(List(8, 9)))

      val p3 = (p1 ++ p2) // this can be planned
      val p4 = (TypedPipe.from(List((8, 1), (10, 2))) ++ p3) // this cannot be planned.

We can possibly add complexity to our rule about HashJoins and merges. Not clear what the recipe is though...

@cwensel in a world where people have functions that return pipes, how can you be sure you are not merging HashJoins? Should we be avoiding this, or is this minimal case enough to see about fixing a bug?

piyushnarang commented 6 years ago

I tried out a few variants of the failing job and it seems like the planner seems to trigger due to both p1 and p2 performing a cross(hashJoin). If I drop the cross from either p1 or p2, the planner seems to proceed. It seems like a potential workaround seems to be to add a forceToDisk on p3. This seems to work val p3 = (p1 ++ p2).forceToDisk. It's not great as it adds an extra step but it might be a workaround till we can fix the planner? Not sure if it can be an simple as if both the pipes feeding into a merge have a hashJoin, then we force to disk.

johnynek commented 6 years ago

I’m pretty concerned we are going to basically render hashJoin worthless or even harmful by adding too many checkpoints.

I wonder if we should just translate all hashJoins to cogroups at the start in cascading 3 unless the user specifically opts in to hashJoins in a config.

Absent a fix in cascading, I don’t see a great option yet.

On Thu, Feb 1, 2018 at 08:49 Piyush Narang notifications@github.com wrote:

I tried out a few variants of the failing job and it seems like the planner seems to trigger due to both p1 and p2 performing a cross(hashJoin). If I drop the cross from either p1 or p2, the planner seems to proceed. It seems like a potential workaround seems to be to add a forceToDisk on p3. This seems to work val p3 = (p1 ++ p2).forceToDisk. It's not great as it adds an extra step but it might be a workaround till we can fix the planner? Not sure if it can be an simple as if both the pipes feeding into a merge have a hashJoin, then we force to disk.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/twitter/scalding/pull/1776#issuecomment-362364068, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEJdjQIyec9sULm81iifkyJUbRUDBKUks5tQgcdgaJpZM4Ryz2B .

-- P. Oscar Boykin, Ph.D. | http://twitter.com/posco | http://pobox.com/~boykin

johnynek commented 6 years ago

I wonder if the rule is: you can have at most one hashJoin input to a merge.

piyushnarang commented 6 years ago

It seems like the behavior matches that - I tried out a run where p1 didn't have the .cross which worked and the same for when just p2 didn't have the .cross. Though the funny thing as you saw is - if you drop p4, just have up to p3 which is the merge of two pipes with hashJoins, that seems to work :-)

I thought flipping the order of the final merge would work, but I realized I had a bug on my end (was writing p3 instead of p4). Seems like if p4's merge lhs / rhs is a pipe which gets two hashJoins it fails.

oscar-stripe commented 6 years ago

filed https://github.com/cwensel/cascading/issues/61 to help track.

johnynek commented 6 years ago

looks like Chris has a repro and a partial fix (does not yet work for Tez) https://github.com/cwensel/cascading/issues/61#issuecomment-362488679

johnynek commented 6 years ago

Well, cascading 3.3.0-wip-18 fixes the hashjoin merge cases we saw (and so far passes the randomly generated tests), but it seems that it has a regression related to partitioned schemes.