microsoft / Trill

Trill is a single-node query processor for temporal or streaming data.
MIT License
1.25k stars 133 forks source link

NullReferenceException when using checkpointing #159

Closed wassim-k closed 3 years ago

wassim-k commented 3 years ago

Hi,

Long time user of Trill and I love it, so thanks for the amazing work. Recently I had a requirement to implement checkpointing to achieve high availability in our application. But for some reason whenever I run it, I get a NullReferenceException Here's a minimal reproduction of the issue:

public static class Program
{
    public static async Task Main()
    {
        Config.ForceRowBasedExecution = true;

        var container = new QueryContainer();

        var punctuationInterval = TimeSpan.FromMilliseconds(100);

        var input = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(i => new { Index = i });
        var ingress = container.RegisterAtemporalInput(input, TimelinePolicy.WallClock(punctuationInterval));

        container.RegisterOutput(ingress)
            .Where(e => e.IsData)
            .Select(e => e.Payload)
            .Subscribe(payload => Console.WriteLine(payload.Index));

        var process = container.Restore();

        await Observable.Interval(TimeSpan.FromSeconds(2))
            .ForEachAsync(_ =>
            {
                using var stream = new MemoryStream();
                process.Checkpoint(stream);
                Console.WriteLine("CHECKPOINT");
            });
    }
}

Once it runs, it'll always fail within 5 to 10 seconds, seems like some sort of a race condition, because it appears to fail at different times and lines of code. Please advise on what I might be doing wrong. Thanks.

peterfreiling commented 3 years ago

Hi @wassim-k, Trill does not support concurrent operations on the same query container. In this repro, it looks like you have both Checkpoint and Processing operations scheduled concurrently. Please add a synchronization mechanism so only one operation can be executed at any given point.

wassim-k commented 3 years ago

Thanks @peterfreiling for the quick response. I have rewritten the above code with sync for others benefit:

public class Payload
{
    public long Index { get; set; }
}

public static class Program
{
    private static readonly object _sync = new object();

    public static async Task Main()
    {
        Config.ForceRowBasedExecution = true;

        var container = new QueryContainer();

        var punctuationInterval = TimeSpan.FromMilliseconds(100);

        var input = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(i => new Payload { Index = i });

        var inputSync  = Observable.Merge(
            input
                .Synchronize(_sync)
                .Select(b => StreamEvent.CreateStart(DateTimeOffset.UtcNow.Ticks, b)),
            Observable
                .Interval(punctuationInterval)
                .Synchronize(_sync)
                .Select(_ => StreamEvent.CreatePunctuation<Payload>(DateTimeOffset.UtcNow.Ticks)));

        var ingress = container.RegisterInput(inputSync);

        container.RegisterOutput(ingress)
            .Where(e => e.IsData)
            .Select(e => e.Payload)
            .Subscribe(payload => Console.WriteLine(payload.Index));

        var process = container.Restore();

        await Observable.Interval(TimeSpan.FromSeconds(2))
            .Synchronize(_sync)
            .ForEachAsync(_ =>
            {
                using var stream = new MemoryStream();
                process.Checkpoint(stream);
                Console.WriteLine("CHECKPOINT");
            });
    }
}