microsoft / Trill

Trill is a single-node query processor for temporal or streaming data.
MIT License
1.24k stars 133 forks source link

[QUESTION] Output count = 0 when the last event leaves snapshot window #147

Closed wassim-k closed 3 years ago

wassim-k commented 3 years ago

Hi, I have the following code and I couldn't figure out how I could get Trill to output a 0 when the last event leaves the snapshot window. I tried OutputDefaultWhenEmpty with no success.

It seems like the Aggregate's Deaccumulate is not being invoked. Is there anyway to accomplish this?

public record Payload(string Name, string GroupId, int Delay);

public void Run()
{
    var array = new Payload[]
    {
        new Payload(Name: "A", GroupId: "GA", Delay: 500),
        new Payload(Name: "B", GroupId: "GA", Delay: 500),
        new Payload(Name: "C", GroupId: "GA", Delay: 500)
    };

    var ingress = array
        .ToObservable()
        .Select(b => Observable.Timer(TimeSpan.FromMilliseconds(b.Delay)).Select(_ => b))
        .Concat()
        .Timestamp()
        .Select(e => StreamEvent.CreatePoint(e.Timestamp.Ticks, e.Value))
        .ToStreamable(
            DisorderPolicy.Throw(),
            FlushPolicy.FlushOnPunctuation,
            PeriodicPunctuationPolicy.Time((ulong)TimeSpan.FromMilliseconds(200).Ticks),
            OnCompletedPolicy.EndOfStream);

    ingress
        .AlterEventDuration(TimeSpan.FromSeconds(3).Ticks)
        .Aggregate(w => w.Count().OutputDefaultWhenEmpty())
        .AlterEventDuration(1)
        .ToStreamEventObservable()
        .Where(e => e.IsData)
        .Subscribe(e => Console.WriteLine(e));
}

Output

[Interval: ..., 1]
[Interval: ..., 2]
[Interval: ..., 3]
[Interval: ..., 2]
[Interval: ..., 1]
peterfreiling commented 3 years ago

Generally Trill will not output anything when there are no events. However, the previous result has an expiration, so the absence of an event can be meaningful as well. You can also ingress dummy events (one per window) to force the output. Further, Deaccumulate is called on End Edges, but intervals are handled with Difference. I'd recommend using a snapshot window macro (e.g., TumblingWindowLifetime.

wassim-k commented 3 years ago

For future reference, I was able to solve this using the following:

stream
    .Count()
    .Multicast(count => count
    .Union(count
        .Select(i => 0ul)
        .Stitch()
        .PointAtEnd())