microsoft / Trill

Trill is a single-node query processor for temporal or streaming data.
MIT License
1.24k stars 133 forks source link

Output on aggregations and partition reset #152

Open octmar opened 3 years ago

octmar commented 3 years ago

Hello, I need help with another scenario on partitioned streams. I have some devices that are raising events to a partitioned ingress stream and i have defined an aggregated output query (sum). My current issue is that output is generated only on a second event pushed and onward, although the output itself is clearly related to the previous input. Example: Input A {t1, value = 2} no output Input B {t2, value = 2} Sum output: {value = 2, t (depends on manipulation of event times, in my case, t1)} Code:

    public sealed class Program
    {
    private static event Action<IndexChangeEvent> PushIndexChangeEvent;

    public struct IndexChangeEvent
    {
        public string IndicatorId;
        public string IndexId;
        public DateTime Date;
        public int Value;
    }

    private static void Process<T>(long s, long e, string key, T data)
    {
        var indexName = data.GetType().GetProperty("Name").GetValue(data).ToString();
        var indexValue = data.GetType().GetProperty("Value").GetValue(data).ToString();
        Console.WriteLine($"Key: {key} Start: {new DateTime(s).ToString("dd/MM/yyyy HH:mm:ss.fffffffZ")} End: {new DateTime(e).ToString("dd/MM/yyyy HH:mm:ss.fffffffZ")} Name: {indexName} Value: {indexValue}");
    }

    public static void Main(string[] args)
    {
        Config.ClearColumnsOnReturn = true;
        var observable = Observable.FromEvent<IndexChangeEvent>(
            onNext => PushIndexChangeEvent += onNext,
            onNext => PushIndexChangeEvent -= onNext);

        var container = new QueryContainer();

        var ingressStream = container.RegisterInput(
            streamEvents: observable.Synchronize().Select(e => PartitionedStreamEvent.CreatePoint(e.IndicatorId, e.Date.Ticks, e)),
            flushPolicy: PartitionedFlushPolicy.None,
            onCompletedPolicy: OnCompletedPolicy.None,
            disorderPolicy: DisorderPolicy.Drop()
            );

        var ingressStreamMulti = ingressStream.Multicast(2);

        var queryInput = ingressStreamMulti[0]
            .Select(e => new { Name = $"{e.IndexId}.{e.Value}", Value = e.Date.ToString("dd/MM/yyyy HH:mm:ss.fffffffZ") });

        var egressStreamInput = container
            .RegisterOutput(queryInput, identifier: "queryInput")
            .ForEachAsync(e => Process(e.StartTime, e.EndTime, e.PartitionKey, e.Payload));

        var queryQtyProduced = ingressStreamMulti[1]
            .AlterEventDuration(PartitionedStreamEvent.InfinitySyncTime)
            .Sum(e => e.Value)
            .Select(e => new { Name = "QtyProduced", Value = e.ToString() })
            .AlterEventDuration(TimeSpan.FromTicks(1).Ticks);

        var egressStreamQtyProduced = container
            .RegisterOutput(queryQtyProduced, identifier: "queryQtyProduced")
            .ForEachAsync(e => Process(e.StartTime, e.EndTime, e.PartitionKey, e.Payload));

        var pipe = container.Restore();

        var data = DateTime.Now;
        var eve11 = new IndexChangeEvent { IndicatorId = "1", IndexId = "1", Date = data, Value = 2 };
        PushIndexChangeEvent?.Invoke(eve11);
        pipe.Flush();
        var eve12 = new IndexChangeEvent { IndicatorId = "1", IndexId = "1", Date = data.AddSeconds(1), Value = 2 };
        PushIndexChangeEvent?.Invoke(eve12);
        pipe.Flush();
        var eve13 = new IndexChangeEvent { IndicatorId = "1", IndexId = "1", Date = data.AddSeconds(2), Value = 2 };
        PushIndexChangeEvent?.Invoke(eve13);
        pipe.Flush();

        Console.ReadLine();
    }
}

From this code you can see that the first query produces output for each input, while the second, only on the second input event and forward. On another piece of code, where i don't use partitioned streams, this same query produces output for every input.

I also have another question regarding partitioned streams. Is it possible to reset all accumulations for a specific partition? I have scenarios where i want to reset data for a partition and i don't want to reinitialize the stream as to not affect other partitions. Thank you