microsoft / Trill

Trill is a single-node query processor for temporal or streaming data.
MIT License
1.24k stars 133 forks source link

Output when SessionTimeoutWindow() ends #139

Open shreyasraghunath opened 3 years ago

shreyasraghunath commented 3 years ago

Hello,

I have been working with Trill for an Real-time application where we need to aggregate live streaming data and output when a SessionTimeoutWindow closes. Using the syntax:

var query = inputStream
            .SessionTimeoutWindow(TimeSpan.FromSeconds(30).Ticks)
            .GroupAggregate(
            x => x.DeviceGuid,
            x => x.Min(v => DateTime.Parse(v.ReceivedTime)),
            x => x.Max(v => DateTime.Parse(v.ReceivedTime)),
            (key, startTime, endTime) => new { key.Key, startTime, endTime }) 

and registering the input as:

var inputStream = this.queryContainer.RegisterInput(
                this.input,
                DisorderPolicy.Adjust(),
                FlushPolicy.FlushOnPunctuation,
                PeriodicPunctuationPolicy.Time((ulong)TimeSpan.FromSeconds(1).Ticks),
                OnCompletedPolicy.EndOfStream);

Trill is able to output whenever it receives a Punctuation and returns intermediary results whenever it receives a Punctuation. What we need instead is Trill to only return when the Session closes. I have tried various combinations to remove the dependency on Punctuations and instead register the input with other settings, but Trill doesn't output anything in that case. Is there a way I can force Trill to output the result of the query only when the SessionTimeoutWindow closes and not the intermediate states when it receives the Punctuation? Thanks for the help.

shreyasraghunath commented 3 years ago

@peterfreiling , @badrishc , @cybertyche can you please help us? Thanks in advance!

peterfreiling commented 3 years ago

Is the problem that Trill is outputting too frequently/inputs are being dropped, or that Trill not outputting frequently enough? Punctuations will also progress time forward to the punctuation timestamp, but if this is not required, you could periodically call Flush on the Process (returned from QueryContainer.Restore). I don't think there is a way to configure Trill to only output when the session window ends, but I don't see why you would need that behavior either.

agarwalshashank95 commented 3 years ago

Hi @peterfreiling , Ill rephrase and try to explain our problem. We are sending data from multiple sensors into a service bus, and a MessageConsumer is picking up messages from the bus and adding it our root stream. On this stream we do a GroupBy to separate it based on the sensor and apply SessionTimeoutWindow as we want to know when a sensor has stopped sending data so that we can do analysis on it and persists the aggregated data into a db.

For the first part of the question we figured out that Trill doesn't automatically populate the punctuation if there is no incoming data. For that we are currently manually pushing in the punctuation using

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(t =>
            {
                var timeTicker = DateTime.Now.Ticks;
                this.rootStream.OnNext(StreamEvent.CreatePunctuation<DeviceMessage>(timeTicker + 1));
            });

and the actual data is sent via

rootStream.OnNext(StreamEvent.CreateStart<DeviceMessage>(syncTime, data))

and this works perfectly for our case. But we are still having some issues with getting the 2nd, running some aggregations and persisting the data from a particular session. According to a similar question #112 you mentioned that the output from a SessionTimeout operator would be like

Start 1:10
Start 1:12
Start 1:13

and then end edges for all at the end when the session is supposed to end. Instead we are getting the output like

Start 1:10
End 1:12
Start 1:12
End 1:13
Start 1:13
End 1:43

assuming that timeout was for 30s. My query is

var query = inputStream.GroupApply((msg) => msg.DeviceGuid,
                (stream) => stream.SessionTimeoutWindow(TimeSpan.FromMinutes(1).Ticks)
                    .Aggregate(
                        window => window.SessionAggregate(message => message)
                    ),
                (group, value) => new GroupedAggrgate(group.Key, value)
            );

where SessionAgregrate is a simple implementation of IAggregate that return count, and GroupedAggrerate is a simple class that has a string key and ulong value. The value that I get from the SesionAgregrate is correct, and returns the correct number of messages belonging to that session. But we have 3 queries:

  1. Is the Start and End edge behavior we are seeing correct for session window
  2. Even if the grouped aggregate operator cannot return an event only for when the session expires, can we somehow get to know it inside our SessionAggregrate implementation, as we want to persist something to the DB only when the session is over.
  3. Currently we create the stream event as StreamEvent.CreateStart(data). If we use StreamEvent.CreatePoint(data) then nothing comes up in our final query stream, just punctuation. If this also expected?

Hopefully this clarifies our problem. Thanks for your help

agarwalshashank95 commented 3 years ago

Hi @arunkm did you get a chance to look at this issue?

shreyasraghunath commented 3 years ago

@peterfreiling and @arunkm please let us know if you need more information! We're blocked and any help would be appreciated. Thanks in advance

arunkm commented 3 years ago

Hi @shreyasraghunath,

  1. This behavior is right for an Aggregate. Typically aggregate has to produce values with mutually exclusive time, So a End and another Start is created when aggregate state/value changes.

  2. No, the aggregate doesn't have much control over data flow.

  3. The StreamEvent.CreateStart(data) and StreamEvent.CreatePoint(data) input through SessionTimeoutWindow should produce identical results in this case.

agarwalshashank95 commented 3 years ago

Hi @arunkm

As you mentioned the Aggregate operator by itself cannot control the flow of data and the Start and End edges will change since we pass it though a GroupAggrgate operator.

But the scenarios I mentioned above, multiple sensors sending data and wanting to do session timeout and some aggregation for each sensor individually, still remains the same. Do you have any insights into how that could be achieved?

Also regarding the 3rd point we are seeing inconsistent behavior when using CreatePoint vs CreateStart even if all the other code remains the same. Do you think it could be bug in the Trill library?

wassim-k commented 3 years ago

I've run into the same issue. I expected that, similar to a tumbling window, the session timeout window would only produce an output once the window closes after the specified period elapses, but instead it produces an output for every ingressed event + the final aggregation at the end of the window.

Is there anyway to change it so that it only outputs the aggregation once per session window?

shreyasraghunath commented 3 years ago

Hi @wassim-k , We couldn't get any answer to this question. Trill is a very good product but unfortunately lacks community support. Hence, we decided to go with Kafka Streams itself.

peterfreiling commented 3 years ago

Sorry for the unresponsiveness. There are two of us maintaining this project part-time among our other commitments.

@shreyasraghunath / @agarwalshashank95 / @wassim-k Let me try to answer your questions. SessionTimeoutWindow simply modifies the lifetime of the events so that the event's end time is set to the end time of the session window, and does not group or aggregate them. So for an input sequence of:

StreamEvent.CreatePoint(0, 1),
StreamEvent.CreatePoint(2, 2),
StreamEvent.CreatePoint(6, 3),
StreamEvent.CreatePoint(40, 4),
...

SessionTimeoutWindow(timeout: 30) will produce the following:

StreamEvent.CreateInterval(0, 36, 1),
StreamEvent.CreateInterval(2, 36, 2),
StreamEvent.CreateInterval(6, 36, 3),
...

In order to aggregate the events within this session, you could do something like the following:

var outputStream = inputStream
    .SessionTimeoutWindow(timeout: 30) // modify end times of events to window termination
    .PointAtEnd() // modify events to be single points in time at the end of the window
    .Sum(e => e); // aggregate all events in the session

This will produce events like:

StreamEvent.CreatePoint(36, 6),
...

These principles can be applied to a grouped stream via GroupApply or Map/Reduce.

As for the Start vs. Point inconsistencies with SessionTimeoutWindow, these should produce identical results. If you have sample input that produces different results, please provide it so we can investigate.

wassim-k commented 3 years ago

That worked perfectly. Thank you @peterfreiling your help so far has been invaluable for our project.