rheem-ecosystem / rheem

Rheem - a cross-platform data processing system
https://rheem-ecosystem.github.io
5 stars 0 forks source link

Handle doubly connected operators correctly in the stage assignment #44

Open sekruse opened 7 years ago

sekruse commented 7 years ago

The following code

    @Test
    public void testTightBroadcast() {
        RheemContext rheemContext = new RheemContext().with(Java.basicPlugin());
        JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(rheemContext);

        LoadCollectionDataQuantaBuilder<Integer> inputDataQuanta =
                javaPlanBuilder.loadCollection(Arrays.asList(1, 2, 3));
        Collection<Integer> result = inputDataQuanta
                .map(x -> x + 1)
                .withBroadcast(inputDataQuanta, "inputDataQuanta")
                .collect();

        Assert.assertEquals(RheemCollections.asSet(2, 3, 4), RheemCollections.asSet(result));
    }

produces this error

Caused by: java.lang.AssertionError
    at org.qcri.rheem.core.optimizer.enumeration.StageAssignmentTraversal.<init>(StageAssignmentTraversal.java:92)
    at org.qcri.rheem.core.optimizer.enumeration.StageAssignmentTraversal.assignStages(StageAssignmentTraversal.java:112)
    at org.qcri.rheem.core.plan.executionplan.ExecutionPlan.createFrom(ExecutionPlan.java:222)
    at org.qcri.rheem.core.api.Job.createInitialExecutionPlan(Job.java:382)
    at org.qcri.rheem.core.api.Job.doExecute(Job.java:247)
    ... 37 more

Apparently, this problem appears because inputDataQuanta and the map call are connected twice: via the regular data flow and via the broadcast. If one inserts a map(x->x) before the map call or before broadcasting, the example works fine.

The above test can be used to reproduce the bug and should be fixed.

sekruse commented 7 years ago

It seems that the ExecutionTaskFlow is already not assembled correctly, as the following log message suggests:

[WARN] org.qcri.rheem.core.optimizer.enumeration.ExecutionTaskFlow - T[JavaMap[1+1->1, id=415b0b49]] has missing input channels among [null, CollectionChannel[T[JavaCollectionSource[0->1, id=4f49f6af]]->[T[JavaMap[1+1->1, id=415b0b49]]]]].

In this instance, only the broadcast is registered correctly, but the regular map input is missing.

sekruse commented 7 years ago

I started working on this issue in branch rheem-44.

I pinpointed that both the optimizer and executor assume that each input channel is only fed once into each operator. Above code does break this assumption. I added changes to make the optimizer aware of the possibility of accessing an input channel twice. However, above test still fails during the execution (in the maintenance of the execution lineage). I stop working on this now for two reasons:

  1. I feel that there are much more potential problems with consuming a channel twice. If we do not fix all of them, we might run into bugs later, that are then even harder to spot and reproduce (e.g., in the re-optimization).
  2. The issue is too specific and the work-around to easy to put a lot of effort in.

So, please feel free to pick up this issue if you are feeling like it. :wink: