Open alberk8 opened 4 months ago
So when it comes back, the consume method isn't yielding anymore messages?
not sure how to replicate this. I can maybe suspend the process and see what happens.
So when it comes back, the consume method isn't yielding anymore messages?
not sure how to replicate this. I can maybe suspend the process and see what happens.
yes, no more message.
Sample Code
public class NatsHostedConsume : IHostedService, IAsyncDisposable
{
private CancellationTokenSource? _cts;
private Task? _task;
private NatsConnection? _natsConnection;
private NatsJSContext? _natsJSContext;
private INatsJSStream? _natsJSStream;
private readonly IHostApplicationLifetime _lifetime;
private readonly INatsConnection _inatsConnection;
private readonly ILogger<NatsHostedConsume> _logger;
public NatsHostedConsume
(
IHostApplicationLifetime lifetime,
INatsConnection inatsConnection,
ILogger<NatsHostedConsume> logger
)
{
_lifetime = lifetime;
_inatsConnection = inatsConnection;
_logger = logger;
}
public async ValueTask DisposeAsync()
{
_cts?.Cancel();
if (_task is not null)
{
await _task;
}
return;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_natsConnection = new NatsConnection(_inatsConnection.Opts);
_natsJSContext = new NatsJSContext(_natsConnection);
Console.WriteLine("Starting Hosted");
await SetupStream(_natsJSContext);
_task = Task.Run(async () => await ProcessDataAsync(_cts.Token));
Console.WriteLine("Task Status: " + _task!.Status);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_cts?.Cancel();
if (_task is not null)
{
Console.WriteLine("Task Status: " + _task!.Status);
await _task;
}
return;
}
private async Task SetupStream(NatsJSContext natsJSContext)
{
var streamConfig = new StreamConfig("TestStream", new[] { "TestSubject" })
{
MaxAge = TimeSpan.FromDays(7),
Compression = StreamConfigCompression.S2
};
try
{
_natsJSStream = await natsJSContext.UpdateStreamAsync(streamConfig);
}
catch (NatsJSApiException e) when (e.Error.Code is 404)
{
_natsJSStream = await natsJSContext.CreateStreamAsync(streamConfig);
}
catch (Exception ex)
{
_logger.LogDebug(ex.Message);
throw;
}
}
private async Task ProcessDataAsync(CancellationToken cancellationToken)
{
var consumerConfig = new ConsumerConfig("DataConsumer")
{
FilterSubject = "TestSubject",
DeliverPolicy = ConsumerConfigDeliverPolicy.All,
AckPolicy = ConsumerConfigAckPolicy.Explicit,
AckWait = TimeSpan.FromSeconds(10),
};
var consumer = await _natsJSContext!.CreateOrUpdateConsumerAsync("TestStream", consumerConfig);
while (!cancellationToken.IsCancellationRequested)
{
await consumer.RefreshAsync(cancellationToken);
await foreach (NatsJSMsg<DataPayload> msg in consumer.ConsumeAsync<DataPayload>().WithCancellation(cancellationToken))
{
if (msg.Data is null)
{
await msg.AckAsync();
continue;
}
try
{
// Do Processing
_logger.LogDebug("Process Data");
}
catch (NatsNoRespondersException ex)
{
_logger.LogDebug($"Nats Error: {ex.Message}");
await msg.NakAsync();
}
catch (Exception ex)
{
_logger.LogDebug($"Exception Error: {ex.Message}");
await msg.NakAsync();
}
await msg.AckAsync();
}
try
{
await Task.Delay(1_000, cancellationToken);
}
catch (TaskCanceledException) { }
}
}
}
are you seeing any exceptions? Could you wrap ProcessDataAsync()
in a try catch and log if there are any exceptions?
No exception. I am putting in try catch on in ProcessDataAsync
and will monitor and report back.
One other thing I notice is that the Log that I have reported above is only in the Console and not log to file, other LogDebug from Nats are logged to file. I am using Serilog for logging.
No exception. I am putting in try catch on in
ProcessDataAsync
and will monitor and report back.
👍
One other thing I notice is that the Log that I have reported above is only in the Console and not log to file, other LogDebug from Nats are logged to file. I am using Serilog for logging.
Depends on how you're injecting NatsConnection. Have a look at the NATS.Extensions.Microsoft.DependencyInjection package.
I am using that nuget package already.
is it maybe the order of your build statements i.e. you add nats first then configure logging? would that make a difference?
The the try and catch on the foreach, still getting the same error. I think it is due to state where disconnect and connect to Nats server when it comes out of hibernation that is causing the problem. This also happens to request/reply where it will say no responder.
coincidently there is a bug fix PR in review which might be the reason for this:
I hope the fix I mentioned above fixes this issue. we just released it in v2.3.2: https://github.com/nats-io/nats.net.v2/releases/tag/v2.3.2
Updated the Nuget and running Test, Will report back the result later.
Observed behavior
After getting out of hibernation (Windows 10) where I have a Jetstream Consumer, sometimes I get the following error. It cannot be reliably replicated. The code that got triggered here.
The Nats server and the Windows 10 client are running on different machine in a LAN network.
Expected behavior
Continue to work.
Server and client version
Nats Server v2.10.16 Nats Client v2.2.3
Host environment
Nats Server Running on Ubuntu Linux 20.04 and running 24hrs.
Steps to reproduce
No response