twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.48k stars 704 forks source link

Fix for planning issue for two `hashJoin`s in a row #1897

Closed ttim closed 5 years ago

ttim commented 5 years ago

Currently if you have two hashJoins in a row your job fails with:

could not build flow from assembly: [[_pipe_1-0*IterableSour...][com.twitter.scalding.typed.cascading_backend.CascadingBackend$.com$twitter$scalding$typed$cascading_backend$CascadingBackend$$planHashJoin(CascadingBackend.scala:662)] found duplicate field names in joined tuple stream: ['key', 'value', 'key1', 'value1']['key1', 'value1']]
cascading.flow.planner.PlannerException: could not build flow from assembly: [[_pipe_1-0*IterableSour...][com.twitter.scalding.typed.cascading_backend.CascadingBackend$.com$twitter$scalding$typed$cascading_backend$CascadingBackend$$planHashJoin(CascadingBackend.scala:662)] found duplicate field names in joined tuple stream: ['key', 'value', 'key1', 'value1']['key1', 'value1']]
    at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:578)
    at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:108)
    at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:40)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
    at com.twitter.scalding.ExecutionContext$class.buildFlow(ExecutionContext.scala:95)
    at com.twitter.scalding.ExecutionContext$$anon$1.buildFlow(ExecutionContext.scala:210)
    at com.twitter.scalding.typed.cascading_backend.AsyncFlowDefRunner$$anon$2.go$1(AsyncFlowDefRunner.scala:172)
    at com.twitter.scalding.typed.cascading_backend.AsyncFlowDefRunner$$anon$2.run(AsyncFlowDefRunner.scala:201)
    at java.lang.Thread.run(Thread.java:745)
Caused by: cascading.pipe.OperatorException: [_pipe_1-0*IterableSour...][com.twitter.scalding.typed.cascading_backend.CascadingBackend$.com$twitter$scalding$typed$cascading_backend$CascadingBackend$$planHashJoin(CascadingBackend.scala:662)] found duplicate field names in joined tuple stream: ['key', 'value', 'key1', 'value1']['key1', 'value1']
    at cascading.pipe.Splice.resolveDeclared(Splice.java:1299)
    at cascading.pipe.Splice.outgoingScopeFor(Splice.java:992)
    at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:628)
    at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:610)
    at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:95)
    ... 7 more
Caused by: cascading.tuple.TupleException: field name already exists: key1
    at cascading.tuple.Fields.copyRetain(Fields.java:1397)
    at cascading.tuple.Fields.appendInternal(Fields.java:1266)
    at cascading.tuple.Fields.append(Fields.java:1215)
    at cascading.pipe.Splice.resolveDeclared(Splice.java:1290)
    ... 11 more

In this PR I've added test case which fails and fix for it. @johnynek suggested that one project (https://github.com/twitter/scalding/blob/0.17.x/scalding-core/src/main/scala/com/twitter/scalding/typed/HashJoinable.scala#L60) was lost during introduction of CascadingBackend. Adding this projection back fixed the problem.