nats-io / nats.net

Full Async C# / .NET client for NATS
https://nats-io.github.io/nats.net
Apache License 2.0
267 stars 54 forks source link

Ordered consumer performance #391

Open mtmk opened 9 months ago

mtmk commented 9 months ago

Ordered consumer performance is our gold standard benchmark for consumers. Test and improve ordered consumer performance. This might inline with planned efforts to improve receive buffer performance.

Prep

> nats stream create x
> nats bench x --js --stream x --pub 1 --purge --msgs 10000000
> nats stream ls

Program.cs

using System.Diagnostics;
using NATS.Client.Core;
using NATS.Client.JetStream;

Console.WriteLine("start");

await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);

var consumer = await js.CreateOrderedConsumerAsync("x");

var count = 0;
double size = 0;
var stopwatch = Stopwatch.StartNew();
await foreach (var msg in consumer.ConsumeAsync<NatsMemoryOwner<byte>>(opts: new NatsJSConsumeOpts
               {
                   MaxMsgs = 5_000,
               }))
{
    using var memory = msg.Data;
    size += memory.Length;
    if (++count == 10_000_000)
        break;
}
stopwatch.Stop();

Console.WriteLine($"{count/stopwatch.Elapsed.TotalSeconds:n0} msgs/sec" +
                  $" ~ {size/stopwatch.Elapsed.Seconds/(1024.0*1024.0):n2} MB/sec");

Console.WriteLine("bye");

Compare to nats bench

> nats bench x --js --stream x --sub 1 --msgs 10000000
14:32:58 JetStream ephemeral ordered push consumer mode, subscribers will not acknowledge the consumption of messages
14:32:58 Starting JetStream benchmark [subject=x,  multisubject=false, multisubjectmax=100000, js=true, msgs=10,000,000, msgsize=128 B, pubs=0, subs=1, stream=x, maxbytes=1.0 GiB, syncpub=false, pubbatch=100, jstimeout=30s, pull=false, consumerbatch=100, push=false, consumername=natscli-bench, purge=false, pubsleep=0s, subsleep=0s, deduplication=false, dedupwindow=2m0s]
14:32:58 Starting subscriber, expecting 10,000,000 messages
Finished     11s [========================================================================] 100%

Sub stats: 846,452 msgs/sec ~ 103.33 MB/sec
> dotnet run -c release
start
701,490 msgs/sec ~ 87.19 MB/sec
bye
to11mtm commented 9 months ago

How much is BoundedChannel still in the Ordered consumer path?

In general (which is worth noting, since while Ordered consumer is the 'standard', perf is cross-cut), BoundedChannnel is noticeably worse for reads (has a Lock on parent which also impacts writes) and a tiny bit worse for writes (although, amplified due to the larger shared lock on reads and writes).

We may want to consider trying to find (or writing) a Channel implementation that takes the properties we need from UnboundedChannel (minimal locking and resize-copying) and BoundedChannel (bounded capacity) and provides the best performance/ease.

Zetanova commented 3 months ago

I am using in the ordered consumer the fetch operation to push the messages into a channel.

Channels can/should be created in two options: Fan-In or Fin-Out

//Fan-Out
Channel<T> _channel = Channel.CreateBounded<T>(new BoundedChannelOptions(BufferSize)
{
    AllowSynchronousContinuations = false,
    SingleWriter = true,
    SingleReader = false,
    FullMode = BoundedChannelFullMode.Wait
});

//Fan-In
Channel<T> _channel = Channel.CreateBounded<T>(new BoundedChannelOptions(BufferSize)
{
    AllowSynchronousContinuations = false,
    SingleWriter = false,
    SingleReader = true,
    FullMode = BoundedChannelFullMode.Wait
});

I think your test degrade from the default SingleWriter = false and SingleReader = false .

The combination SingleWriter=true and SingleReader=true makes little sense, because it would make the channel obsolete. But can be used to buffer stream processing between two threads with backpressure / failure support.

The AllowSynchronousContinuations = false could normaly be true to allow Task-Inlining of the TP, but with the current nats.net architecture it is not supported. It will generate timeouts in some internal logic. (this was for the v1 client)

Zetanova commented 3 months ago

The channel is created with SingleWriter=true, SingleReader=false see

But the IAsyncEnumerator of consumer.ConsumeAsync assumes a sequential processing.

To use a channel with SingleWriter=true, SingleReader=true would be ideal here.

mtmk commented 3 months ago

thanks @Zetanova sounds good. also what @to11mtm suggested about unbounded channels makes sense as well if we can use it as an option and utilize NATS flow control messages to control the backpressure (which is what we should do anyway) and also see some performance tests that would be awesome. Feel free to open a PR if you want to have a go at it we can hammer it out there.

Zetanova commented 3 months ago

I didn't look at the v2 code much but most likely it suffers like the v1 from to much buffering and threads.

The channel reader is best used in a tight loop, It then offers the the ability to give control back on buffer-flush and with the writer backpressure can be signaled. With the IAsyncEnumeration this feature can only be archived by wrapping the messages in wire/date and including control-message.

When I switch to v2 I can make a PR.