nats-io / nats.net

Full Async C# / .NET client for NATS
https://nats-io.github.io/nats.net
Apache License 2.0
265 stars 54 forks source link

Fetch on already deleted consumer does not throw an exception #628

Open NonnRa opened 2 months ago

NonnRa commented 2 months ago

Observed behavior

When i create a pull consumer and the consumer is deleted on the server before i call FetchAsync(..). I will never get any message from that consumer and i will never get an exception. So that i stuck in an endless loop calling FetchAsync again and again.

Expected behavior

I expected to get a Exception like described on the FetchAsync method.

NatsJSException — There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier

Server and client version

Client version: NATS.Net 2.3.3 Server verison: 2.10.20

Host environment

No response

Steps to reproduce

using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;

Console.WriteLine("Start");
string streamName = "TestStream";
List<string> subjects = new List<string> { "test.>" };
string consumerName = "TestConsumer";

var opts = new NatsOpts
{
    Url = "localhost:4222",
    Name = "test-service",
};

Console.WriteLine("Create connection");
var connection = new NatsConnection(opts);

Console.WriteLine("Create stream");
var js = new NatsJSContext(connection);
var stream = await js.CreateStreamAsync(new StreamConfig(streamName, subjects));

Console.WriteLine("Create consumer");
var consumer = await js.CreateOrUpdateConsumerAsync(streamName, new ConsumerConfig
{
    Name = consumerName,
    AckPolicy = ConsumerConfigAckPolicy.None,
    DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence,
    InactiveThreshold = TimeSpan.FromSeconds(5),
    OptStartSeq = 1,
});

Console.WriteLine("Wait for consumer InactiveThreshold");
await Task.Delay(TimeSpan.FromSeconds(10)); //Wait until the consumer is delete due to it InactiveThreshold

Console.WriteLine("Fetch on non existent consumer");
var receivedMessages = consumer.FetchAsync<byte[]>(new NatsJSFetchOpts(){ Expires = TimeSpan.FromSeconds(10), MaxMsgs = 100 });
await foreach (var msg in receivedMessages)
{
    Console.WriteLine(msg);
}

On the FetchAsync(..) i would expect to get an exception as the consumer is not valid anymore

mtmk commented 2 months ago

do you get any notifications on opts.notificationhandler?

NonnRa commented 2 months ago
var receivedMessages = consumer.FetchAsync<byte[]>(new NatsJSFetchOpts(){ Expires = TimeSpan.FromSeconds(10), MaxMsgs = 100, NotificationHandler = PrintNotifications });
await foreach (var msg in receivedMessages)
{
    Console.WriteLine(msg);
}

Task PrintNotifications(INatsJSNotification ns, CancellationToken ct)
{
    Console.WriteLine(ns.Name);
    if(ns is NatsJSProtocolNotification protocolNotification)
    {
        Console.WriteLine($"{protocolNotification.Name}, {protocolNotification.HeaderCode}, {protocolNotification.HeaderMessageText}");
    }
    return Task.CompletedTask;
} 
Start
Create connection
Create stream
Create consumer
Wait for consumer InactiveThreshold
Fetch on non existent consumer
Timeout

Adapted my example to print the notifications. The only notification i got is the Timeout at the end.

mtmk commented 2 months ago

Thank you for this report @NonnRa it's a good question. unfortunately server doesn't return any errors on requests in this case. you have to explicitly check for the consumer using consumer.Refresh():

await consumer.RefreshAsync(); // this will create an additional consumer info API call to JetStream server
var receivedMessages = consumer.FetchAsync<byte[]>(new NatsJSFetchOpts(){ Expires = TimeSpan.FromSeconds(10), MaxMsgs = 100, NotificationHandler = PrintNotifications });
await foreach (var msg in receivedMessages)
{
    Console.WriteLine(msg);
}

... or check for the NatsJSTimeoutNotification then refresh to avoid the additional consumer info call every time:

    if (ns is NatsJSTimeoutNotification)
    {
        Console.WriteLine("Timeout");
        try
        {
            await consumer.RefreshAsync(ct);
        }
        catch (NatsJSApiException e) when (e.Error.Code == 404)
        {
            Console.WriteLine("Consumer not found");
            await myCancellationTokenSource.CancelAsync(); // somehow signal the fetch loop
        }
        catch (Exception e)
        {
            Console.WriteLine("Log other exceptions");
        }
    }
NonnRa commented 2 months ago

Thanks for the response. I will try to build a safty net around it with you snippets. A exception at that point would have helped us a lot to find an issue on our code, as it was not intended to have a delay between the consumer creation and the first fetch.