dotnetcore / CAP

Distributed transaction solution in micro-service base on eventually consistency, also an eventbus with Outbox pattern
http://cap.dotnetcore.xyz
MIT License
6.61k stars 1.28k forks source link

BUG: NATS throws exceptions when custom consumer configurations are set #1567

Closed henrychris closed 1 month ago

henrychris commented 1 month ago

Configuration

In my Program.cs, I have CAP configured like so:

builder.Services.AddCap(o =>
{
    o.UseEntityFramework<NotificationDataContext>(opt => opt.Schema = "Outbox");
    o.UseNATS(opt =>
    {
        var natsUrl = builder.Configuration.GetValue<string>("NatsSettings:Url")!;

        opt.Servers = natsUrl;
        opt.StreamOptions = strOpts =>
        {
            strOpts.WithRetentionPolicy(NATS.Client.JetStream.RetentionPolicy.Limits);
            strOpts.WithStorageType(NATS.Client.JetStream.StorageType.File);
            strOpts.WithDuplicateWindow(NATS.Client.Internals.Duration.OfMinutes(1));
            strOpts.WithDiscardPolicy(NATS.Client.JetStream.DiscardPolicy.Old);

            strOpts.Build();
        };

        opt.ConsumerOptions = consOpts =>
        {
            consOpts.WithBackoff(
                NATS.Client.Internals.Duration.OfSeconds(5),
                NATS.Client.Internals.Duration.OfSeconds(30),
                NATS.Client.Internals.Duration.OfMinutes(1),
                NATS.Client.Internals.Duration.OfMinutes(2),
                NATS.Client.Internals.Duration.OfMinutes(5),
                NATS.Client.Internals.Duration.OfMinutes(10)
            );

            consOpts.WithAckPolicy(NATS.Client.JetStream.AckPolicy.Explicit);
            consOpts.WithDeliverPolicy(NATS.Client.JetStream.DeliverPolicy.All);
            consOpts.WithReplayPolicy(NATS.Client.JetStream.ReplayPolicy.Instant);
        };
    });
});

I have an interface and class:

IUserService:

public interface IUserService
{
    Task HandleEvent(DateTime time);
}

UserService:

public class UserService: IUserService
{
    [CapSubscribe("test.show.time")]
    public async Task HandleEvent(DateTime time)
    {
        Console.WriteLine("message time is:" + time);
    }
}

These are both registered as scoped services.

services.AddScoped<IUserService, UserService>();

How To Reproduce

  1. Using Docker, start a new nats server: docker run -it -p 4222:4222 -p 8222:8222 -p 8080:8080 --name nats -d nats:2.10.14 -js -m 8222.

  2. Set natsUrl in Program.cs to "nats://localhost:4222".

  3. Start the application.

  4. Close the application.

  5. Start the application again. This exception should appear in the terminal:

Error Log:

NATS.Client.NATSJetStreamClientException: [SUB-90016] Existing consumer cannot be modified. [AckWait]
   at NATS.Client.JetStream.JetStream.CreateSubscription(String userSubscribeSubject, PushSubscribeOptions pushSubscribeOptions, PullSubscribeOptions pullSubscribeOptions, String queueName, EventHandler`1 userHandler, Boolean autoAck, PullMessageManager pmmInstance)
   at NATS.Client.JetStream.JetStream.PushSubscribeAsync(String subject, String queue, EventHandler`1 handler, Boolean autoAck, PushSubscribeOptions options)
   at DotNetCore.CAP.NATS.NATSConsumerClient.Subscribe(IEnumerable`1 topics)

Expected Behaviour

An exception should not be thrown here because the consumer configuration is not changing. If I don't provide a custom consumer configuration, no exception is thrown on application restart.

yang-xiaodong commented 1 month ago

In the NATS Server, the BackOff option overrides the AckWait option, ignoring the AckWait settings. This can cause an inconsistency between the NATS client configuration and the server configuration, leading to exceptions when the application restarts. Since CAP internally provides a default value for AckWait, you need to reset the AckWait option to avoid configuration comparison issues.

consOpts.WithBackoff(
    NATS.Client.Internals.Duration.OfSeconds(5),
    NATS.Client.Internals.Duration.OfSeconds(30),
    NATS.Client.Internals.Duration.OfMinutes(1),
    NATS.Client.Internals.Duration.OfMinutes(2),
    NATS.Client.Internals.Duration.OfMinutes(5),
    NATS.Client.Internals.Duration.OfMinutes(10)
);
consOpts.WithAckWait(null);    // add this line !!!!
henrychris commented 1 month ago

I just tried this out, and it works! Thank you so much! Do I close this myself, or will you do the honours?