TU-Berlin-DIMA / scotty-window-processor

This repository provides Scotty, a framework for efficient window aggregations for out-of-order Stream Processing.
https://tu-berlin-dima.github.io/scotty-window-processor/
Apache License 2.0
75 stars 23 forks source link

java.lang.ArrayIndexOutOfBoundsException #46

Open eastcirclek opened 2 years ago

eastcirclek commented 2 years ago

Hi,

We're facing the following exception while trying to adopt Scotty on the latest Flink-1.14.2:

switched from RUNNING to FAILED with failure cause: java.lang.ArrayIndexOutOfBoundsException: -1
    at java.util.ArrayList.elementData(ArrayList.java:424)
    at java.util.ArrayList.get(ArrayList.java:437)
    at de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.getSlice(LazyAggregateStore.java:53)
    at de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.insertValueToSlice(LazyAggregateStore.java:64)
    at de.tub.dima.scotty.slicing.SliceManager.processElement(SliceManager.java:76)
    at de.tub.dima.scotty.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:43)
    at de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator.processElement(KeyedScottyWindowOperator.java:62)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:748)

We create a window operator as follows:

import de.tub.dima.scotty.core.windowType.{SlidingWindow, WindowMeasure}
import de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator
import de.tub.dima.scotty.flinkconnector.{_}

    val windowOp = new KeyedScottyWindowOperator[(java.lang.String, Long), NaviGpsProcessable, NaviTrafficUserResult](new NaviTrafficUserAggregationScotty())
    windowOp.addWindow(new SlidingWindow(WindowMeasure.Time, 600_000, 60_000))

    val userAggStream = stream
      .keyBy(el => (el.id, el.trafficId))
      .process(windowOp)
      .map(_.getAggValues.get(0));

Can I get any advice on this?

Best,

Dongwon

eastcirclek commented 2 years ago

FYI, the pipeline seems to work fine for the first 10~20 mins.

julianev commented 2 years ago

Hi @eastcirclek, Sorry for the late reply, we somehow overlooked this issue between Christmas and New Year.

I believe that this error is caused by an out-of-order tuple with a timestamp outside the allowed lateness which can not be added to any slice. In issue #44 we described that such a tuple does not need to be added to a slice, and linked a fix in PR #45 which we plan to merge. Please give us some feedback if this solves the problem!

Best, Juliane