nats-io / nats.net.v1

The official C# Client for NATS
Apache License 2.0
646 stars 154 forks source link

NatsJSConsume timeout when using Ordered Consumer (big gap between messages) #869

Closed fernandozago closed 6 months ago

fernandozago commented 6 months ago

Observed behavior

When using OrderedConsumer, it warns a timeout from NATS.Client.JetStream.Internal.NatsJSConsume, when the GAP between two messages is too big, then throw exception:

info: NATS.Client.JetStream.NatsJSOrderedConsumer[2010]
      Created FgHNf6y8xaikZ3sijNdaKV with sequence 0
$KV.testdata.1: 29/02/2024 11:29:46
$KV.testdata.2: 29/02/2024 11:29:47
$KV.testdata.3: 29/02/2024 11:29:48
$KV.testdata.4: 29/02/2024 11:29:53
$KV.testdata.5: 29/02/2024 11:30:13
warn: NATS.Client.JetStream.Internal.NatsJSConsume[2003]
      Idle heartbeat timeout
warn: NATS.Client.JetStream.NatsJSOrderedConsumer[2019]
      idle-heartbeat timed out. Retrying...
warn: NATS.Client.JetStream.NatsJSOrderedConsumer[2018]
      Consumer loop exited
fail: NATS-by-Example[0]
      Consumer Failed!
      NATS.Client.JetStream.NatsJSApiException: consumer delivery policy is deliver by start sequence, but optional start time is also set
         at NATS.Client.JetStream.Internal.NatsJSResponse`1.EnsureSuccess()
         at NATS.Client.JetStream.NatsJSContext.JSRequestResponseAsync[TRequest,TResponse](String subject, TRequest request, CancellationToken cancellationToken)
         at NATS.Client.JetStream.NatsJSOrderedConsumer.RecreateConsumer(String consumer, UInt64 seq, CancellationToken cancellationToken)
         at NATS.Client.JetStream.NatsJSOrderedConsumer.ConsumeAsync[T](INatsDeserialize`1 serializer, NatsJSConsumeOpts opts, CancellationToken cancellationToken)+MoveNext()
         at NATS.Client.JetStream.NatsJSOrderedConsumer.ConsumeAsync[T](INatsDeserialize`1 serializer, NatsJSConsumeOpts opts, CancellationToken cancellationToken)+MoveNext()
         at NATS.Client.JetStream.NatsJSOrderedConsumer.ConsumeAsync[T](INatsDeserialize`1 serializer, NatsJSConsumeOpts opts, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
         at Program.<Main>$(String[] args) in F:\Desenvolvimento\WolverineTests\NatsBug\Program.cs:line 68
         at Program.<Main>$(String[] args) in F:\Desenvolvimento\WolverineTests\NatsBug\Program.cs:line 68
new NatsJSOrderedConsumerOpts()
{
    ReplayPolicy = ConsumerConfigReplayPolicy.Original,

    DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartTime,
    OptStartTime = DateTime.Today
}

Expected behavior

Should replay as is, as configured ConsumerConfigReplayPolicy.Original

Server and client version

Server Version = 2.10.11 Client Version = 2.1.0

Host environment

Not Relevant

Steps to reproduce

using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.Client.KeyValueStore;

#region Setup
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");

var opts = new NatsOpts
{
    Url = "192.168.2.10:4222",
    LoggerFactory = loggerFactory,
    Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

INatsKVStore testData = null!;
try
{
    testData = await kv.GetStoreAsync("testdata");
}
catch (Exception ex)
{
    testData = await kv.CreateStoreAsync(new NatsKVConfig("testdata"));
    logger.LogInformation("Preparing dataset...");
    int key = 1;
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));

    await Task.Delay(TimeSpan.FromSeconds(1));
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));

    await Task.Delay(TimeSpan.FromSeconds(1));
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));

    await Task.Delay(TimeSpan.FromSeconds(5));
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));

    await Task.Delay(TimeSpan.FromSeconds(20));
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));

    await Task.Delay(TimeSpan.FromMinutes(1.1));
    logger.LogInformation("Sent: {seq}",
        await testData.PutAsync((key++).ToString(), DateTime.Now));
} 
#endregion

var consumer = await js.CreateOrderedConsumerAsync("KV_testdata", new NatsJSOrderedConsumerOpts()
{
    ReplayPolicy = ConsumerConfigReplayPolicy.Original,

    DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartTime,
    OptStartTime = DateTime.Today
});

try
{
    await foreach (var item in consumer.ConsumeAsync<DateTime>().ConfigureAwait(false))
    {
        Console.WriteLine($"{item.Subject}: {item.Data}");
    }
}
catch (Exception ex)
{
    logger.LogError(ex, "Consumer Failed!");
}
fernandozago commented 6 months ago

Works as expected when I use this config: *Even though it warns a timeout and re-create the consumer from the sequence

new NatsJSOrderedConsumerOpts()
{
    ReplayPolicy = ConsumerConfigReplayPolicy.Original,

    DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence,
    OptStartSeq = 1
}
info: NATS.Client.JetStream.NatsJSOrderedConsumer[2010]
      Created W6SdcyNQZz5x1J6gieYgVL with sequence 0
$KV.testdata.1: 29/02/2024 11:29:46
$KV.testdata.2: 29/02/2024 11:29:47
$KV.testdata.3: 29/02/2024 11:29:48
$KV.testdata.4: 29/02/2024 11:29:53
$KV.testdata.5: 29/02/2024 11:30:13
warn: NATS.Client.JetStream.Internal.NatsJSConsume[2003]
      Idle heartbeat timeout
warn: NATS.Client.JetStream.NatsJSOrderedConsumer[2019]
      idle-heartbeat timed out. Retrying...
warn: NATS.Client.JetStream.NatsJSOrderedConsumer[2018]
      Consumer loop exited
info: NATS.Client.JetStream.NatsJSOrderedConsumer[2010]
      Created Ub5ek7phy3IrFEoKE5gVCk with sequence 5
$KV.testdata.6: 29/02/2024 11:31:13
fernandozago commented 6 months ago

Sorry, wrong repo!