TU-Berlin-DIMA / scotty-window-processor

This repository provides Scotty, a framework for efficient window aggregations for out-of-order Stream Processing.
https://tu-berlin-dima.github.io/scotty-window-processor/
Apache License 2.0
75 stars 23 forks source link

[Bug] Out-of-order record handling #44

Closed bvonheid closed 2 years ago

bvonheid commented 3 years ago

Environment:

SlidingWindow with following configuration: SlidingWindow(WindowMeasure.Time, 5000, 1000)

Exception

When handling out-of-order records that has a timestamp smaller than currentWatermark - maxLateness - maxFinalWindowSize the following exception is thrown.

java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 6
        at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
        at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
        at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
        at java.base/java.util.Objects.checkIndex(Objects.java:372)
        at java.base/java.util.ArrayList.get(ArrayList.java:458)
        at de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.getSlice(LazyAggregateStore.java:53)
        at de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.insertValueToSlice(LazyAggregateStore.java:64)
        at de.tub.dima.scotty.slicing.SliceManager.processElement(SliceManager.java:76)
        at de.tub.dima.scotty.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:43)
        at de.tub.dima.scotty.kafkastreamsconnector.KeyedScottyWindowOperator.process(KeyedScottyWindowOperator.java:47)
        at de.tub.dima.scotty.kafkastreamsconnector.KeyedScottyWindowTransformer.transform(KeyedScottyWindowTransformer.java:26)
        at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
        at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
        at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:64)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

In the SliceManager class:

int indexOfSlice = this.aggregationStore.findSliceIndexByTimestamp(ts);
this.aggregationStore.insertValueToSlice(indexOfSlice, element, ts);

a -1 is assigned to indexOfSlice and therefore in the insertValueToSlice method the exception is triggered.

Actually no value needs to be added to a slice, because the element is out of order (currentWatermark - maxLateness - maxFinalWindowSize) and therefore not should be added.

scotty_excpetion
julianev commented 3 years ago

Hi @bvonheid, thanks for opening this issue! Yes, an out-of-order record does not need to be added to a slice, when its timestamp is outside of the allowed lateness. Best, Juliane