nats-io / nats.net.v2

Full Async C# / .NET client for NATS
https://nats-io.github.io/nats.net.v2/
Apache License 2.0
202 stars 40 forks source link

Jetstream throws debug messages after resuming from Hibernation. #544

Open alberk8 opened 4 days ago

alberk8 commented 4 days ago

Observed behavior

After getting out of hibernation (Windows 10) where I have a Jetstream Consumer, sometimes I get the following error. It cannot be reliably replicated. The code that got triggered here.

The Nats server and the Windows 10 client are running on different machine in a LAN network.

[09:49:10.298  DBG][] NATS.Client.JetStream.Internal.NatsJSConsume: Idle heartbeat timeout after 00:00:15ns
[09:49:40.298  DBG][] NATS.Client.JetStream.Internal.NatsJSConsume: Idle heartbeat timeout after 00:00:15ns
[09:50:10.301  DBG][] NATS.Client.JetStream.Internal.NatsJSConsume: Idle heartbeat timeout after 00:00:15ns
[09:50:40.300  DBG][] NATS.Client.JetStream.Internal.NatsJSConsume: Idle heartbeat timeout after 00:00:15ns
[09:51:10.312  DBG][] NATS.Client.JetStream.Internal.NatsJSConsume: Idle heartbeat timeout after 00:00:15ns 

Expected behavior

Continue to work.

Server and client version

Nats Server v2.10.16 Nats Client v2.2.3

Host environment

Nats Server Running on Ubuntu Linux 20.04 and running 24hrs.

Steps to reproduce

No response

mtmk commented 4 days ago

So when it comes back, the consume method isn't yielding anymore messages?

not sure how to replicate this. I can maybe suspend the process and see what happens.

alberk8 commented 3 days ago

So when it comes back, the consume method isn't yielding anymore messages?

not sure how to replicate this. I can maybe suspend the process and see what happens.

yes, no more message.

Sample Code

 public class NatsHostedConsume : IHostedService, IAsyncDisposable
 {
     private CancellationTokenSource? _cts;
     private Task? _task;
     private NatsConnection? _natsConnection;
     private NatsJSContext? _natsJSContext;
     private INatsJSStream? _natsJSStream;

     private readonly IHostApplicationLifetime _lifetime;
     private readonly INatsConnection _inatsConnection;
     private readonly ILogger<NatsHostedConsume> _logger;

     public NatsHostedConsume
     (
          IHostApplicationLifetime lifetime,
          INatsConnection inatsConnection,
          ILogger<NatsHostedConsume> logger
     ) 
     {
         _lifetime = lifetime;
         _inatsConnection = inatsConnection;
         _logger = logger;
     }

     public async ValueTask DisposeAsync()
     {
         _cts?.Cancel();
         if (_task is not null)
         {
             await _task;
         }
         return;
     }

     public async Task StartAsync(CancellationToken cancellationToken)
     {
         _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

         _natsConnection = new NatsConnection(_inatsConnection.Opts);
         _natsJSContext = new NatsJSContext(_natsConnection);

         Console.WriteLine("Starting Hosted");

         await SetupStream(_natsJSContext);

         _task = Task.Run(async () => await ProcessDataAsync(_cts.Token));

          Console.WriteLine("Task Status: " + _task!.Status);
     }

     public async Task StopAsync(CancellationToken cancellationToken)
     {
         _cts?.Cancel();
         if (_task is not null)
         {
             Console.WriteLine("Task Status: " + _task!.Status);
             await _task;
         }
         return;
     }

     private async Task SetupStream(NatsJSContext natsJSContext)
     {
         var streamConfig = new StreamConfig("TestStream", new[] { "TestSubject" })
         {
             MaxAge = TimeSpan.FromDays(7),
             Compression = StreamConfigCompression.S2
         };

         try
         {

             _natsJSStream = await natsJSContext.UpdateStreamAsync(streamConfig);  
         }
         catch (NatsJSApiException e) when (e.Error.Code is 404)
         {
             _natsJSStream = await natsJSContext.CreateStreamAsync(streamConfig);
         }
         catch (Exception ex)
         {
             _logger.LogDebug(ex.Message);
             throw;
         }
     }

     private async Task ProcessDataAsync(CancellationToken cancellationToken)
     {
         var consumerConfig = new ConsumerConfig("DataConsumer")
         {

             FilterSubject = "TestSubject",
             DeliverPolicy = ConsumerConfigDeliverPolicy.All,
             AckPolicy = ConsumerConfigAckPolicy.Explicit,
             AckWait = TimeSpan.FromSeconds(10),
         };

         var consumer = await _natsJSContext!.CreateOrUpdateConsumerAsync("TestStream", consumerConfig);

         while (!cancellationToken.IsCancellationRequested)
         {
             await consumer.RefreshAsync(cancellationToken);

             await foreach (NatsJSMsg<DataPayload> msg in consumer.ConsumeAsync<DataPayload>().WithCancellation(cancellationToken))

             {
                 if (msg.Data is null)
                 {
                     await msg.AckAsync();
                     continue;
                 }
                 try
                 {
                     // Do Processing
                     _logger.LogDebug("Process Data");
                 }
                 catch (NatsNoRespondersException ex)
                 {
                     _logger.LogDebug($"Nats Error: {ex.Message}");
                     await msg.NakAsync();
                 }

                 catch (Exception ex)
                 {

                     _logger.LogDebug($"Exception Error: {ex.Message}");
                     await msg.NakAsync();
                 }

                 await msg.AckAsync();
             }
             try
             {
                 await Task.Delay(1_000, cancellationToken);
             }
             catch (TaskCanceledException) { }
         }
     }
 }
mtmk commented 3 days ago

are you seeing any exceptions? Could you wrap ProcessDataAsync() in a try catch and log if there are any exceptions?

alberk8 commented 3 days ago

No exception. I am putting in try catch on in ProcessDataAsync and will monitor and report back.
One other thing I notice is that the Log that I have reported above is only in the Console and not log to file, other LogDebug from Nats are logged to file. I am using Serilog for logging.

mtmk commented 3 days ago

No exception. I am putting in try catch on in ProcessDataAsync and will monitor and report back.

👍

One other thing I notice is that the Log that I have reported above is only in the Console and not log to file, other LogDebug from Nats are logged to file. I am using Serilog for logging.

Depends on how you're injecting NatsConnection. Have a look at the NATS.Extensions.Microsoft.DependencyInjection package.

alberk8 commented 3 days ago

I am using that nuget package already.

mtmk commented 3 days ago

is it maybe the order of your build statements i.e. you add nats first then configure logging? would that make a difference?