Closed rysavy-ondrej closed 6 months ago
The principle is demonstrated in the example. Note that the initial time needs to be set before the pipeline is execute otherwise it generates empty windows from the Zero time to the first flow timestamp:
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
var random = new Random();
string GetRandomIp()
{
return $"{random.Next(0,255)}.{random.Next(0,255)}.{random.Next(0,255)}.{random.Next(0,255)}";
}
public record IpFlow(int Id, string src, string dst, int srcpt, int dstpt);
var flowId = 0;
Timestamped<IpFlow> GetNextFlow()
{
var now = DateTimeOffset.Now;
var flow = new IpFlow(++flowId, GetRandomIp(), GetRandomIp(), random.Next(1024, 65525), random.Next(10,512));
return new Timestamped<IpFlow>(flow, now);
}
// Create a virtual time scheduler
var virtualScheduler = new HistoricalScheduler();
virtualScheduler.AdvanceTo(DateTimeOffset.Now);
// Create a subject to act as our event source
var subject = new Subject<Timestamped<IpFlow>>();
// Define the window duration in virtual time units
var windowDuration = TimeSpan.FromSeconds(3);
var windowShift = TimeSpan.FromSeconds(1);
// Apply the Window operator
var windowedObservable = subject
.Window(windowDuration, windowShift, virtualScheduler)
.SelectMany(window => window.ToList(), (window, items) => items);
// Subscribe to the windowed observable
windowedObservable.Subscribe(window =>
{
Console.WriteLine($"New Window: {string.Join(", ", window)}");
});
Console.WriteLine(virtualScheduler.Now);
for(int i = 0; i < 100; i++)
{
var flow = GetNextFlow();
virtualScheduler.AdvanceTo(flow.Timestamp);
subject.OnNext(flow);
Thread.Sleep(TimeSpan.FromMilliseconds(random.Next(100,1000)));
}
subject.OnCompleted();
Implemented and commited https://github.com/rysavy-ondrej/ethanol/commit/cb63a656b319c47b8e8e467bea4e731c03938007.
Use the virtual time for handling timestamps in flows.
The Reactive enables to use of VirtualScheduler to deal with virtual time. In our case, the virtual time can be directly derived from the flow's timestamps. Using this scheduler, we can use built-in delay, window, and other time-related operations, which provide better performance and are safer.
See the following for more information: https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242963(v=vs.103) https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242967(v=vs.103)