nats-io / nats.net.v1

The official C# Client for NATS
Apache License 2.0
646 stars 154 forks source link

NullReferenceException while trying to fetch messages from server #789

Closed robertmircea closed 1 year ago

robertmircea commented 1 year ago

Description

I encountered a NullReferenceException in NATS.Client.JetStream.PullMessageManager.Manage(Msg msg) when running the NATS subscriber application using the NATS client library (version 1.0.4) in a scenario where the NATS server is cleanly shutdown, and the consumer tries to fetch the next message from the server.

Environment

NATS Server Version: 2.9.18 NATS Client Library Version: 1.0.4 MacOS Ventura 13.4

Steps to Reproduce

Start the NATS server.

 using System.Diagnostics;
    using NATS.Client;
    using NATS.Client.JetStream;

    const string subjectName = "dev.audit.>";
    const string streamName = "dev-auditevents";

    var cts = new CancellationTokenSource();
    Console.CancelKeyPress += delegate {
        cts.Cancel();
    };

    var opts = GetNatsOpts();
    var connectionFactory = new ConnectionFactory();
    using var connection = connectionFactory.CreateConnection(opts);
    var jetStreamManagementContext = connection.CreateJetStreamManagementContext();
    var jetStreamContext = connection.CreateJetStreamContext();

    var cc = ConsumerConfiguration.Builder()
        .WithDurable(Guid.NewGuid().ToString("N")) //The name of the Consumer, which the server will track, allowing resuming consumption where left off. By default, a consumer is ephemeral. To make the consumer durable, set the name.
        .WithMaxAckPending(200)
        .WithAckWait(3*30000) 
        .WithMaxDeliver(3)
        .WithReplayPolicy(ReplayPolicy.Instant)
        .WithFilterSubject(subjectName)
        .WithDeliverPolicy(DeliverPolicy.All)
        .Build();
    jetStreamManagementContext.AddOrUpdateConsumer(streamName, cc);

    Console.WriteLine("Starting subscriber...");
    var psob = new PullSubscribeOptions.PullSubscribeOptionsSubscribeOptionsBuilder()
        .WithConfiguration(cc);
    using var sub = jetStreamContext.PullSubscribe(subjectName, psob.Build());
    var consumed = 0;
    while (!cts.IsCancellationRequested)
    {
        ConsumedPullExpiresIn(sub, ref consumed);
    }

void ConsumedPullExpiresIn(IJetStreamPullSubscription jetStreamPullSubscription, ref int i)
    {
        jetStreamPullSubscription.PullExpiresIn(100, 500);
        try
        {
            for (Msg msg = jetStreamPullSubscription.NextMessage(1050); msg != null; msg = jetStreamPullSubscription.NextMessage(10))
            {
                if (!msg.IsJetStream) continue;
                Console.WriteLine("Consumed: " + i);
                Thread.Sleep(50);
                msg.Ack();
                i++;
            }
        }
        catch (NATSTimeoutException)
        {
        }

    }

    Console.WriteLine("Consuming finished. # of consumed messages: {0}", consumed);

    Options GetNatsOpts()
    {
        var opts = ConnectionFactory.GetDefaultOptions();
        opts.AllowReconnect = true;
        opts.User = "user";
        opts.Password = "pass";
        opts.Servers = new []{"nats://localhost:4224"};

        opts.ServerDiscoveredEventHandler += (_, _) => Console.WriteLine("NATS server discovered");
        opts.ClosedEventHandler +=
            (_, args) => Console.WriteLine("NATS connection closed: " + args.Error);
        opts.DisconnectedEventHandler += (_, args) =>
            Console.WriteLine("NATS connection disconnected: " + args.Error);
        opts.AsyncErrorEventHandler +=
            (_, args) => Console.WriteLine("NATS async error: {0}, Message={1}, Subject={2}", args.Conn.ConnectedUrl,
                args.Error, args.Subscription.Subject);

        return opts;
    }

Expected Behavior

The subscriber application should handle the server shutdown gracefully, without throwing an exception, and continue execution when the server is back online.

Actual Behavior

After the NATS server is shut down, the subscriber application throws a "Connection closed" exception after a period of time.

Output

Starting subscriber...
Consumed: 0
Consumed: 1
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed: 6
Consumed: 7
[.........]
Consumed: 22
Consumed: 23
Consumed: 24
Consumed: 25
Consumed: 26
Consumed: 27
Consumed: 28
NATS connection disconnected: 
Consumed: 29
Consumed: 30
Consumed: 31
[.........]
Consumed: 96
Consumed: 97
Consumed: 98
Consumed: 99
Unhandled exception. NATS connection disconnected: System.AggregateException: One or more errors occurred. (Connection refused)
 ---> System.Net.Sockets.SocketException (61): Connection refused
   at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token)
   at System.Threading.Tasks.ValueTask.ValueTaskSourceAsTask.<>c.<.cctor>b__4_0(Object state)
--- End of stack trace from previous location ---
   at System.Net.Sockets.TcpClient.CompleteConnectAsync(Task task)
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait(TimeSpan timeout)
   at NATS.Client.Connection.TCPConnection.open(Srv s, Int32 timeoutMillis)
   at NATS.Client.Connection.createConn(Srv s, Exception& ex)
NATS connection closed: System.AggregateException: One or more errors occurred. (Connection refused)
 ---> System.Net.Sockets.SocketException (61): Connection refused
   at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token)
   at System.Threading.Tasks.ValueTask.ValueTaskSourceAsTask.<>c.<.cctor>b__4_0(Object state)
--- End of stack trace from previous location ---
   at System.Net.Sockets.TcpClient.CompleteConnectAsync(Task task)
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait(TimeSpan timeout)
   at NATS.Client.Connection.TCPConnection.open(Srv s, Int32 timeoutMillis)
   at NATS.Client.Connection.createConn(Srv s, Exception& ex)
System.NullReferenceException: Object reference not set to an instance of an object.
   at NATS.Client.JetStream.PullMessageManager.Manage(Msg msg)
   at NATS.Client.JetStream.JetStreamAbstractSyncSubscription.NextMessage(Int32 timeout)
   at Program.<>c__DisplayClass0_0.<<Main>$>g__ConsumedPullExpiresIn|3(IJetStreamPullSubscription jetStreamPullSubscription, Int32& i) in /Users/robert/Sandbox/NatsJetstreamPerfTest/Program.cs:line 97
   at Program.<Main>$(String[] args) in /Users/robert/Sandbox/NatsJetstreamPerfTest/Program.cs:line 39

Additional Information

scottf commented 1 year ago

I think I fixed this in the unreleased version. Will be coming this week. I could build a pre-release today if you like for you to try.

robertmircea commented 1 year ago

Yes, I would like to try. Thanks.

scottf commented 1 year ago

Yes, this was addressed here, I had found the same problem in testing.

https://github.com/nats-io/nats.net/pull/782/files#diff-f6cff168ed35aa4596c4f2fadd5ab43b37e07bf089c30f7f6ca03bd5f33d1e0eR112

So the NRE was because this method used to return null, but now it will throw an exception instead.

I suppose the only thing for discussion would be is the NATSTimeoutException a good exception, or should it bee something else. My thinking was the connection is gone, no message is coming, timeout seems fair.

I'm actually trying to get a release out, so please be patient, it should be today or tomorrow.

scottf commented 1 year ago

1.0.5 has been released

robertmircea commented 1 year ago

I've upgraded the solution to use the new nuget packatge and I repeated the scenario: I started consuming and, at some point during consumption, I stopped the server. This time, I didn't receive the NRE exception and when I restarted the server, the client resumed message processing. After some time, the client started printing the following "warning" (because it does not crash the process)

PullStatusWarning, Connection: 243, Subscription: 2, ConsumerName:f79986685fe44655b98bffb08a872eba, Status: Status 408 Request Timeout
PullStatusWarning, Connection: 243, Subscription: 2, ConsumerName:f79986685fe44655b98bffb08a872eba, Status: Status 408 Request Timeout
PullStatusWarning, Connection: 243, Subscription: 2, ConsumerName:f79986685fe44655b98bffb08a872eba, Status: Status 408 Request Timeout

I put the nats server in debug/verbose mode and I see that the above message is generated when the client performs jetStreamPullSubscription.NextMessage:

[8700] 2023/06/21 11:37:03.350429 [TRC] [::1]:50748 - cid:243 - <<- [PUB $JS.API.CONSUMER.MSG.NEXT.dev-auditevents.f79986685fe44655b98bffb08a872eba _INBOX.hngpn6EaoOdomEUi-vYfk9.10111 33]
[8700] 2023/06/21 11:37:03.350431 [TRC] [::1]:50748 - cid:243 - <<- MSG_PAYLOAD: ["{\"batch\":100,\"expires\":500000000}"]

Not sure how to interpret this.

scottf commented 1 year ago

Those are just normal warnings for pull. Consider your pull

jetStreamPullSubscription.PullExpiresIn(100, 500);

The 408 means that server did not get 100 messages during the 500 ms expiration period and the pull is complete. I'm considering offering a pull request option to turn off warnings as you already get a timeout. You can also modify the PullStatusWarningEventHandler to not show that particular warning.