TU-Berlin-DIMA / scotty-window-processor

This repository provides Scotty, a framework for efficient window aggregations for out-of-order Stream Processing.
https://tu-berlin-dima.github.io/scotty-window-processor/
Apache License 2.0
75 stars 23 forks source link

[KafkaStreams]Fix bug that same key is returned for different window results #40

Closed bvonheid closed 3 years ago

bvonheid commented 3 years ago

The processWatermark(..) function is called inside the process(..) function and use the key of the process function to write back to Kafka. However, windows with different keys can be processed inside the function and produce a result. This may result in the same key being used for different window results. Using the key that belongs to the SlicingWindowOperator leads to the desired result.

Wrong behavior: The Key s_9 is used in both windows, that have the same window time. However, actually their keys should be different.

WindowResult(Time,1625148255000-1625148285000,[StatsWindowFunction->Stats{count=25, mean=10.0, populationStandardDeviation=0.0, min=10.0, max=10.0}])
[theodolite-uc3-application-0.0.1-9df841c6-cecc-4e92-982a-590125145e72-StreamThread-1] INFO theodolite.uc3.streamprocessing.TopologyBuilder - s_9: Stats{count=25, mean=10.0, populationStandardDeviation=0.0, min=10.0, max=10.0}
WindowResult(Time,1625148255000-1625148285000,[StatsWindowFunction->Stats{count=40, mean=10.0, populationStandardDeviation=0.0, min=10.0, max=10.0}])
[theodolite-uc3-application-0.0.1-9df841c6-cecc-4e92-982a-590125145e72-StreamThread-1] INFO theodolite.uc3.streamprocessing.TopologyBuilder - s_9: Stats{count=40, mean=10.0, populationStandardDeviation=0.0, min=10.0, max=10.0}

Desired behavior: Now s_5 and s_9 are used as the keys for the different windows.

WindowResult(Time,1625148255000-1625148285000,[StatsWindowFunction->Stats{count=25, mean=10.0, populationStandardDeviation=0.0, min=10.0, max=10.0}])
[theodolite-uc3-application-0.0.1-9df841c6-cecc-4e92-982a-590125145e72-StreamThread-1] INFO theodolite.uc3.streamprocessing.TopologyBuilder - s_5: Stats{count=25, mean=10.0, populationStandardDeviation=0.0, min=10.0, max=10.0}
WindowResult(Time,1625148255000-1625148285000,[StatsWindowFunction->Stats{count=40, mean=10.0, populationStandardDeviation=0.0, min=10.0, max=10.0}])
[theodolite-uc3-application-0.0.1-9df841c6-cecc-4e92-982a-590125145e72-StreamThread-1] INFO theodolite.uc3.streamprocessing.TopologyBuilder - s_9: Stats{count=40, mean=10.0, populationStandardDeviation=0.0, min=10.0, max=10.0}