nats-io / nats.net

Full Async C# / .NET client for NATS
https://nats-io.github.io/nats.net
Apache License 2.0
262 stars 53 forks source link

NatsConnection.SubscribeAsync<T>() not respecting CancellationToken. Takes 30 seconds to terminate program. #635

Closed knallle closed 1 month ago

knallle commented 1 month ago

Observed behavior

When cancelling my Console Application (minimal example provided below), it takes exactly 30 seconds for the program to terminate after calling for cancellation with CTRL-C.

Debugging with Parallel Stacks in Visual Studio led me to NATS.Client.Core.NatsConnection.SubScribeAsync():

/// <inheritdoc />
public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();

    await using var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
    await SubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);

    // We don't cancel the channel reader here because we want to keep reading until the subscription
    // channel writer completes so that messages left in the channel can be consumed before exit the loop.
    await foreach (var msg in sub.Msgs.ReadAllAsync(CancellationToken.None).ConfigureAwait(false))
    {
        yield return msg;
    }
}

It is clearly stated that the channel reader is not cancelled and the incoming CancellationToken is not passed to sub.Msgs.ReadAllAsync().

Expected behavior

I expect the program to terminate sooner than 30 seconds after cancellation is called.

If immediate cancellation is impossible, enabling the developer to set the timeout to something less than 30 seconds would be great.

Server and client version

nats-server: v2.10.20 NATS.Net: 2.4.0

Host environment

Steps to reproduce

Minimal working example

Program.cs

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Hosting;

namespace NatsBugMinimalExample
{
    internal class Program
    {
        static async Task Main(string[] args)
        {
            var host = Host.CreateDefaultBuilder(args)
                .ConfigureServices((context, services) =>
                {
                    services.AddSingleton<CancellationTokenSource>();
                    services.AddHostedService<MyService>();
                })
                .ConfigureLogging(logging =>
                {
                    logging.ClearProviders();
                    logging.AddSimpleConsole(options =>
                    {
                        options.TimestampFormat = "yyyy-MM-dd HH:mm:ss.fff";
                        options.SingleLine = true;
                    });
                })
                .Build();
            await host.RunAsync();
        }
    }
}

MyService.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;

namespace NatsBugMinimalExample
{
    internal class MyService : BackgroundService
    {
        private readonly ILogger<MyService> _logger;
        private readonly CancellationTokenSource _cancellationTokenSource;
        public MyService(ILogger<MyService> logger, CancellationTokenSource cancellationTokenSource)
        {
            _logger = logger;
            _cancellationTokenSource = cancellationTokenSource;
        }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            const string subject = "myservice.number";
            var opts = new NatsOpts { Url = "127.0.0.1:4222" };
            await using var connection = new NatsConnection(opts: opts);
            await connection.ConnectAsync();
            await foreach (var msg in connection.SubscribeAsync<int>(subject).WithCancellation(_cancellationTokenSource.Token))
            {
                await msg.ReplyAsync(_cancellationTokenSource.Token);
            }
        }
    }
}
mtmk commented 1 month ago

thanks for the report @knallle I'm not familiar with CancellationTokenSource being used as singleton but if you pass the stoppingToken it works as expected:

await foreach (var msg in connection.SubscribeAsync<int>(subject).WithCancellation(stoppingToken))

edit: even if I don't pass any token to SubscribeAsync I see the same behaviour described above.

knallle commented 1 month ago

@mtmk Thanks for having a look! The problem was, as you pointed out, my handling of the CancellationTokenSource. I was misled by a Medium post.

Thanks for your hard work! 👍