TU-Berlin-DIMA / scotty-window-processor

This repository provides Scotty, a framework for efficient window aggregations for out-of-order Stream Processing.
https://tu-berlin-dima.github.io/scotty-window-processor/
Apache License 2.0
75 stars 23 forks source link

No data for singleton aggregations? #61

Open jelos98 opened 1 year ago

jelos98 commented 1 year ago

What is the expected behavior when only a single input occurs within a window? We're trying to run a simple sum aggregation over some events with long-tail of infrequent events (and a smaller number of high frequency events). We boiled it down to a test, to run in a test harness (the aggregation function simply sums the count, and takes the max of timestamps - omitted for simplicity, because the function itself doesn't appear to matter for this).

      ScottyWindowOperator<String, UserEventCounter, UserEventCounter> keyedScottyWindowOperator = ...
        UserEventCounter element1 = new UserEventCounter();
        element1.userKey = "user1";
        element1.timestamp = 1;
        element1.count = 4;

        UserEventCounter element2 = new UserEventCounter();
        element2.userKey = "user1";
        element2.timestamp = 16;
        element2.count = 5;

        UserEventCounter element3 = new UserEventCounter();
        element3.userKey = "user1";
        element3.timestamp = 33;
        element3.count = 6;

        testHarness.processElement(element1, 1);
        testHarness.processWatermark(11);
        testHarness.processElement(element2, 16);
        testHarness.processWatermark(25);        
        testHarness.processWatermark(30);        
        testHarness.processElement(element3, 33);
        testHarness.processWatermark(35);
        testHarness.processWatermark(40);
        testHarness.processWatermark(45);
        testHarness.processWatermark(50);

We modified ScottyWindowOperator to add some debugging prints (value in processElement, and the AggregateWindow, before filtering on aggregation.hasValue() in processWatermarks, and see the following sequence:

processElement|K     user1 W@             1| = UserEventCounter(userKey=user1, timestamp=1, count=4)
processElement|K     user1 W@            16| = UserEventCounter(userKey=user1, timestamp=16, count=5)

processWatermark|user1 @           11| = WindowResult(Time,0-10,[ScottySumUserEventCounter->UserEventCounter(userKey=user1, timestamp=1, count=4)])

processElement|K     user1 W@            33| = UserEventCounter(userKey=user1, timestamp=33, count=6)

processWatermark|user1 @           25| = WindowResult(Time,15-25,[ScottySumUserEventCounter->])
processWatermark|user1 @           25| = WindowResult(Time,10-20,[ScottySumUserEventCounter->])
processWatermark|user1 @           25| = WindowResult(Time,5-15,[ScottySumUserEventCounter->])

Observations:

Does Scotty, as implemented, only work if you have consistent (non-sporadic) stream of events? Is there any workaround if you have something with keys that only receive sporadic traffic like this?

powibol commented 11 months ago

Hi @jelos98, thank you for your request and detailed explanation in the ticket. We will look into the issue and get back to you as soon as possible.