nats-io / nats.net.v2

Full Async C# / .NET client for NATS
https://nats-io.github.io/nats.net.v2/
Apache License 2.0
202 stars 40 forks source link

Update consume backpressure handling #508

Open mtmk opened 1 month ago

mtmk commented 1 month ago

The capacity now has a limit set to maxMsgs to prevent pulling excessive messages from the server. It also has been bounded to a maximum of 1024 to avoid large object heap allocations.

Program.cs

using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;

await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);

var stream = await js.CreateStreamAsync(new StreamConfig("teststream", ["test-subject"]));
var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("testcons"));

var count = 0;
await foreach (var msg in consumer.ConsumeAsync<string>(opts: new NatsJSConsumeOpts { MaxMsgs = 5 }))
{
    await Task.Delay(3000); // processing
    await msg.AckAsync();
    if (count++ == 3)
        break; // application exits
}

Publish messages:

> nats pub test-subject '{{.Count}}' --count 50

Before: Consumes all the data exceeding maxMsgs

> nats consumer report teststream
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                             Consumer report for teststream with 1 consumers                             │
├──────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤
│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │
├──────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤
│ testcons │ Pull │ Explicit   │ 30.00s   │ 46          │ 0           │ 0           │ 4         │         │
╰──────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯

After: Consumes limited to maxMsgs and a few more (1 or 2 pull requests worth) due to pre-fetch optimization.

> nats consumer report teststream
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                             Consumer report for teststream with 1 consumers                             │
├──────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤
│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │
├──────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤
│ testcons │ Pull │ Explicit   │ 30.00s   │ 10          │ 0           │ 36 / 72%    │ 4         │         │
╰──────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯
caleblloyd commented 1 month ago

What happens if they use MaxBytes instead? Won't the channel be 1024 again and it will pull through all of the messages until it is full again?

Should we introduce an intermediate channel that consumes the full pull, then calls for the next pull after it is half way consumed?