nats-io / nats.net

Full Async C# / .NET client for NATS
https://nats-io.github.io/nats.net
Apache License 2.0
239 stars 49 forks source link

NatsJSConsume timedout when using Ordered Consumer (big gap between messages) #420

Closed fernandozago closed 6 months ago

fernandozago commented 6 months ago

Observed behavior

When using OrderedConsumer, it warns a timeout from NATS.Client.JetStream.Internal.NatsJSConsume, when the GAP between two messages is too big, then throw exception:

info: NATS.Client.JetStream.NatsJSOrderedConsumer[2010]
      Created FgHNf6y8xaikZ3sijNdaKV with sequence 0
$KV.testdata.1: 29/02/2024 11:29:46
$KV.testdata.2: 29/02/2024 11:29:47
$KV.testdata.3: 29/02/2024 11:29:48
$KV.testdata.4: 29/02/2024 11:29:53
$KV.testdata.5: 29/02/2024 11:30:13
warn: NATS.Client.JetStream.Internal.NatsJSConsume[2003]
      Idle heartbeat timeout
warn: NATS.Client.JetStream.NatsJSOrderedConsumer[2019]
      idle-heartbeat timed out. Retrying...
warn: NATS.Client.JetStream.NatsJSOrderedConsumer[2018]
      Consumer loop exited
fail: NATS-by-Example[0]
      Consumer Failed!
      NATS.Client.JetStream.NatsJSApiException: consumer delivery policy is deliver by start sequence, but optional start time is also set
         at NATS.Client.JetStream.Internal.NatsJSResponse`1.EnsureSuccess()
         at NATS.Client.JetStream.NatsJSContext.JSRequestResponseAsync[TRequest,TResponse](String subject, TRequest request, CancellationToken cancellationToken)
         at NATS.Client.JetStream.NatsJSOrderedConsumer.RecreateConsumer(String consumer, UInt64 seq, CancellationToken cancellationToken)
         at NATS.Client.JetStream.NatsJSOrderedConsumer.ConsumeAsync[T](INatsDeserialize`1 serializer, NatsJSConsumeOpts opts, CancellationToken cancellationToken)+MoveNext()
         at NATS.Client.JetStream.NatsJSOrderedConsumer.ConsumeAsync[T](INatsDeserialize`1 serializer, NatsJSConsumeOpts opts, CancellationToken cancellationToken)+MoveNext()
         at NATS.Client.JetStream.NatsJSOrderedConsumer.ConsumeAsync[T](INatsDeserialize`1 serializer, NatsJSConsumeOpts opts, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
         at Program.<Main>$(String[] args) in F:\Desenvolvimento\WolverineTests\NatsBug\Program.cs:line 68
         at Program.<Main>$(String[] args) in F:\Desenvolvimento\WolverineTests\NatsBug\Program.cs:line 68
new NatsJSOrderedConsumerOpts()
{
    ReplayPolicy = ConsumerConfigReplayPolicy.Original,

    DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartTime,
    OptStartTime = DateTime.Today
}

Expected behavior

Should replay as is, as configured ConsumerConfigReplayPolicy.Original

Server and client version

Server Version = 2.10.11 Client Version = 2.1.2

Host environment

Not Relevant

Steps to reproduce

using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.Client.KeyValueStore;

#region Setup
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");

var opts = new NatsOpts
{
    Url = "192.168.2.10:4222",
    LoggerFactory = loggerFactory,
    Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

INatsKVStore testData = null!;
try
{
    testData = await kv.GetStoreAsync("testdata");
}
catch (Exception ex)
{
    testData = await kv.CreateStoreAsync(new NatsKVConfig("testdata"));
    logger.LogInformation("Preparing dataset...");
    int key = 1;
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));

    await Task.Delay(TimeSpan.FromSeconds(1));
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));

    await Task.Delay(TimeSpan.FromSeconds(1));
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));

    await Task.Delay(TimeSpan.FromSeconds(5));
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));

    await Task.Delay(TimeSpan.FromSeconds(20));
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));

    await Task.Delay(TimeSpan.FromMinutes(1.1));
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));
} 
#endregion

var consumer = await js.CreateOrderedConsumerAsync("KV_testdata", new NatsJSOrderedConsumerOpts()
{
    ReplayPolicy = ConsumerConfigReplayPolicy.Original,

    DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartTime,
    OptStartTime = DateTime.Today
});

try
{
    await foreach (var item in consumer.ConsumeAsync<DateTime>())
    {
        Console.WriteLine($"{item.Subject}: {item.Data}");
    }
}
catch (Exception ex)
{
    logger.LogError(ex, "Consumer Failed!");
}
fernandozago commented 6 months ago

Works almost as expected when I use this config: *Even though it warns a timeout and re-create the consumer from the stopped sequence (loosing the expected behaviour of awaiting for that gap)

new NatsJSOrderedConsumerOpts()
{
    ReplayPolicy = ConsumerConfigReplayPolicy.Original,

    DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence,
    OptStartSeq = 1
}
info: NATS.Client.JetStream.NatsJSOrderedConsumer[2010]
      Created W6SdcyNQZz5x1J6gieYgVL with sequence 0
$KV.testdata.1: 29/02/2024 11:29:46
$KV.testdata.2: 29/02/2024 11:29:47
$KV.testdata.3: 29/02/2024 11:29:48
$KV.testdata.4: 29/02/2024 11:29:53
$KV.testdata.5: 29/02/2024 11:30:13
warn: NATS.Client.JetStream.Internal.NatsJSConsume[2003]
      Idle heartbeat timeout
warn: NATS.Client.JetStream.NatsJSOrderedConsumer[2019]
      idle-heartbeat timed out. Retrying...
warn: NATS.Client.JetStream.NatsJSOrderedConsumer[2018]
      Consumer loop exited
info: NATS.Client.JetStream.NatsJSOrderedConsumer[2010]
      Created Ub5ek7phy3IrFEoKE5gVCk with sequence 5
$KV.testdata.6: 29/02/2024 11:31:13
fernandozago commented 6 months ago

Temporary Solution:

private async Task<NatsJSConsumer> RecreateConsumer(string consumer, ulong seq, CancellationToken cancellationToken)
...
consumerOpts = _opts with
{
    OptStartSeq = seq + 1,
    DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence,
+    OptStartTime = default,
};
...

allows the re-creation of the OrderedConsumer properly

info: NATS.Client.JetStream.NatsJSOrderedConsumer[2010]
      Created b9li230OABvZd9eSfDbALd with sequence 0
$KV.testdata.1: 29/02/2024 12:33:10
$KV.testdata.2: 29/02/2024 12:33:11
$KV.testdata.3: 29/02/2024 12:33:12
$KV.testdata.4: 29/02/2024 12:33:17
$KV.testdata.5: 29/02/2024 12:33:37
warn: NATS.Client.JetStream.Internal.NatsJSConsume[2003]
      Idle heartbeat timeout
warn: NATS.Client.JetStream.NatsJSOrderedConsumer[2019]
      idle-heartbeat timed out. Retrying...
warn: NATS.Client.JetStream.NatsJSOrderedConsumer[2018]
      Consumer loop exited
info: NATS.Client.JetStream.NatsJSOrderedConsumer[2010]
      Created FDGAkx2QSlvjRYij08cfOM with sequence 5
$KV.testdata.6: 29/02/2024 12:34:43

But I don't think that is the final solution.

mtmk commented 6 months ago

thanks @fernandozago, yes you fixed the first issue 💯 we should clear start-time after the first request.

Still looking into the second one.

fernandozago commented 6 months ago

thanks @fernandozago, yes you fixed the first issue 💯 we should clear start-time after the first request.

Still looking into the second one.

Hey @mtmk , not a problem. Thank you for quick response.

mtmk commented 6 months ago

it doesn't look like reply-policy=original with ordered consumers is going to keep the longer gaps with a correct pause since ordered consumers are ephemeral and can reset anytime. You can create a temporary durable consumer to achieve the same goal:

var consumerName = $"tmp_{Random.Shared.Next(1_000_000)}";

var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig(consumerName)
{
    ReplayPolicy = ConsumerConfigReplayPolicy.Original,
});

await foreach (var msg in consumer.ConsumeAsync<int>())
{
    await msg.AckAsync();
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} data: {msg.Data}");
    if (msg.Metadata?.NumPending == 0)
    {
        break;
    }
}

await stream.DeleteConsumerAsync(consumerName);
fernandozago commented 6 months ago

Yes... It works this way.

Would be better if we could use ephemeral consumer tough.

Thanks @mtmk !