GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 324 forks source link

Odd behaviour of SlidingWindows when used with TestPipeline #487

Closed dkruchinin closed 6 years ago

dkruchinin commented 7 years ago

I've got a simple test that demonstrate an odd behaviour of sliding window when used with TestPipeline. Basically a bunch of strings is fed to the input, then they get accumulated in the sliding window, then the sum aggregation is applied to count the duplicates and finally the output of the aggregation function is logged. With a sliding window of 10 minutes duration and 5 minutes period the pipeline should produce two intersecting windows: [-5, 5] and [0, 10]...

public class SlidingWindowTest {
    private static PipelineOptions options = PipelineOptionsFactory.create();
    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTest.class);

    private static class IdentityDoFn extends DoFn<KV<String, Integer>, KV<String, Integer>>
        implements DoFn.RequiresWindowAccess{

        @Override
        public void processElement(ProcessContext processContext) throws Exception {
            KV<String, Integer> item = processContext.element();
            LOG.info("~~~~~~~~~~> {} => {}", item.getKey(), item.getValue());
            LOG.info("~~~~~~~~~~~ {}", processContext.window());
            processContext.output(item);
        }
    }

    private static class OutputWithCurrentTsFn extends DoFn<String, String> {

        @Override
        public void processElement(ProcessContext processContext) throws Exception {
            processContext.outputWithTimestamp(processContext.element(), Instant.now());
        }
    }

    @Test
    public void whatsWrongWithSlidingWindow() {
        Pipeline p = TestPipeline.create(options);

        p.apply(Create.of("cab", "abc", "a1b2c3", "abc", "a1b2c3"))
            //.apply(ParDo.of(new OutputWithCurrentTsFn()))
            .apply(MapElements.via((String item) -> KV.of(item, 1))
                       .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
            .apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.standardMinutes(10))
                                                        .every(Duration.standardMinutes(5))))
            .apply(Sum.integersPerKey())
            .apply(ParDo.of(new IdentityDoFn()));

        p.run();
    }
}

but that's what I get instead:

...
17:52:00.246 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> cab => 1
17:52:00.246 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
17:52:00.246 [main] DEBUG c.g.c.d.sdk.coders.CoderRegistry - Default coder for class java.lang.String found by factory
17:52:00.246 [main] DEBUG c.g.c.d.sdk.coders.CoderRegistry - Default coder for class java.lang.Integer found by factory
17:52:00.246 [main] DEBUG c.g.c.d.sdk.coders.CoderRegistry - Default coder for com.google.cloud.dataflow.sdk.values.KV<java.lang.String, java.lang.Integer>: KvCoder(StringUtf8Coder, VarIntCoder)
17:52:00.247 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> cab => 1
17:52:00.247 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
17:52:00.247 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> abc => 2
17:52:00.247 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
17:52:00.247 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> abc => 2
17:52:00.247 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
17:52:00.247 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> abc => 2
17:52:00.247 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
17:52:00.247 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
17:52:00.247 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
17:52:00.248 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
17:52:00.248 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
17:52:00.248 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
17:52:00.248 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
17:52:00.249 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> cab => 1
17:52:00.249 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
...

As you can see there're three windows instead of two and all three strings get to the "extra" window that doesn't intersect in any way with the other two.

Now the interesting thing, if you uncomment .apply(ParDo.of(new OutputWithCurrentTsFn())) line and run the test again, it works as expected producing only two intersecting sliding windows:

...
18:01:30.904 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
18:01:30.904 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [2016-11-17T17:55:00.000Z..2016-11-17T18:05:00.000Z)
18:01:30.905 [main] DEBUG c.g.c.d.sdk.coders.CoderRegistry - Default coder for class java.lang.String found by factory
18:01:30.905 [main] DEBUG c.g.c.d.sdk.coders.CoderRegistry - Default coder for class java.lang.Integer found by factory
18:01:30.905 [main] DEBUG c.g.c.d.sdk.coders.CoderRegistry - Default coder for com.google.cloud.dataflow.sdk.values.KV<java.lang.String, java.lang.Integer>: KvCoder(StringUtf8Coder, VarIntCoder)
18:01:30.905 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
18:01:30.905 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [2016-11-17T18:00:00.000Z..2016-11-17T18:10:00.000Z)
18:01:30.906 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> abc => 2
18:01:30.906 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [2016-11-17T18:00:00.000Z..2016-11-17T18:10:00.000Z)
18:01:30.906 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> abc => 2
18:01:30.906 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [2016-11-17T17:55:00.000Z..2016-11-17T18:05:00.000Z)
18:01:30.906 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> cab => 1
18:01:30.906 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [2016-11-17T17:55:00.000Z..2016-11-17T18:05:00.000Z)
18:01:30.906 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~> cab => 1
18:01:30.906 [main] INFO  c.q.m.m.core.mdef.SlidingWindowTest - ~~~~~~~~~~~ [2016-11-17T18:00:00.000Z..2016-11-17T18:10:00.000Z)
...

SDK version: 1.8.0

P/S: based on http://stackoverflow.com/questions/40527677/odd-behaviour-of-slidingwindows-when-used-with-testpipeline/40537541

kennknowles commented 7 years ago

Thanks for the detailed report! Taking a close look.

kennknowles commented 6 years ago

At this point, the SDK in question is deprecated. Can you reproduce it with the latest Beam SDK? If so, comment here and/or in a Beam JIRA. I'm going to close this one, though.