nats-io / nats.net

Full Async C# / .NET client for NATS
https://nats-io.github.io/nats.net
Apache License 2.0
262 stars 53 forks source link

JetStream publish batching #375

Closed anth-git closed 4 months ago

anth-git commented 9 months ago

Observed behavior

Is the .NET client 10x slower than the native one, or am I doing something wrong?

./nats bench bar --js --pub 1 --size 2048 --msgs 100000
Starting JetStream benchmark [subject=bar, multisubject=false, multisubjectmax=100000, js=true, msgs=100,000, msgsize=2.0 KiB, pubs=1, subs=0, stream=benchstream, maxbytes=1.0 GiB, storage=file, syncpub=false, pubbatch=100, jstimeout=30s, pull=false, consumerbatch=100, push=false, consumername=natscli-bench, replicas=1, purge=false, pubsleep=0s, subsleep=0s, dedup=false, dedupwindow=2m0s]
Pub stats: 108,061 msgs/sec ~ 211.06 MB/sec

./NetClientTest 2048 100000
Produced 100000 messages in 7862 ms; 13k msg/s ~ 25 MB/sec

And the code I'm using (.NET 8):

// NetClientTest

using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using System.Diagnostics;

var msgSize = args.Length > 0 ? int.Parse(args[0]) : 2048;
var msgCount = args.Length > 1 ? int.Parse(args[1]) : 100000;

int cnt = 0;
var data = new byte[msgSize].Select(a => (byte)(++cnt)).ToArray();

await using var nats = new NatsConnection(new NatsOpts
{
    Url = "127.0.0.1:4222",
    Name = "NATS-by-Example",
});

var js = new NatsJSContext(nats);

await js.CreateStreamAsync(new StreamConfig("test", ["test.subject"]));
await js.PurgeStreamAsync("test", new StreamPurgeRequest());

Stopwatch sw = new Stopwatch();
sw.Start();

for (int i = 0; i < msgCount; ++i)
{
    var r = await js.PublishAsync<byte[]>(subject: "test.subject", data);
}

sw.Stop();

Console.WriteLine($"Produced {msgCount} messages in {(int)sw.ElapsedMilliseconds} ms; {(msgCount / (sw.Elapsed.TotalSeconds) / 1000.0):F0}k msg/s ~ {(msgCount * msgSize) / (1024 * 1024 * sw.Elapsed.TotalSeconds):F0} MB/sec");
Console.ReadKey();

Expected behavior

It should have similar performance

Server and client version

nats-server: v2.10.10 nats-cli: v0.1.1 NATS.Client.Core: v2.0.3

Host environment

No response

Steps to reproduce

No response

mtmk commented 9 months ago

Thanks for the report. I am seeing similar results. It looks like nats bench is sending the requests and not waiting for the responses one after the other whereas in our .NET implementation we are. When I batch the publish tasks I'm seeing similar figures e.g.:

EDIT ⚠️ don't use this with long running large batches. creates a lot of pressure on GC causes operation cancelled exceptions. see also #523

const int batch = 10_000;
for (int i = 0; i < msgCount / batch; ++i)
{
    var tasks = new List<Task<PubAckResponse>>();

    for (int j = 0; j < batch; j++)
    {
        Task<PubAckResponse> publishAsync = js.PublishAsync<byte[]>(subject: "test.subject", data).AsTask();
        tasks.Add(publishAsync);
    }

    foreach (var task in tasks)
            await task;
}
//
// Produced 100000 messages in 1417 ms; 71k msg/s ~ 138 MB/sec
//
// nats bench bar --js --pub 1 --size 2048 --msgs 100000
// Pub stats: 74,439 msgs/sec ~ 145.39 MB/sec
//

Edit: and if we batch all of it we get the same result:

Produced 100000 messages in 1343 ms; 74k msg/s ~ 145 MB/sec
anth-git commented 9 months ago

Yes I figured that it must have something to do with batching. I found --pubbatch flag in nats bench, and when I set it to 1, performance deteriorated significantly and was similar to results obtained using .Net client:

./nats bench bar --js --pub 1 --size 2048 --msgs 100000 --pubbatch 1
Pub stats: 16,539 msgs/sec ~ 32.30 MB/sec

Anyway, shouldn't batching be implemented in the client? Similarly as it's in Kafka client (batch.size, linger.ms)?

mtmk commented 9 months ago

Anyway, shouldn't batching be implemented in the client? Similarly as it's in Kafka client (batch.size, linger.ms)?

We should be able to implement that but I'm not sure what the API would look like in terms of collecting ACKs.

to11mtm commented 4 months ago

Anyway, shouldn't batching be implemented in the client? Similarly as it's in Kafka client (batch.size, linger.ms)?

We should be able to implement that but I'm not sure what the API would look like in terms of collecting ACKs.

FWIW, in the past when I've had to do similar, it's been something like;

[buffer-aggregating-to-a-byte-or-count-limit]->[stage taking what got buffered and setting up a 'lookup' from each thing to a TCS or poolableIValueTaskSourcetype thing]->[writing the stuff]->[setting the TCS or IVTS]

When I do it in Akka Streams, (think something between channels and enumerables but with an awesome DSL,) it's usually done as two stages (BatchWeighted and SelectAsync). Of course we can't take Akka Streams as a dependency. ;)

Those -can- be represented as channel stages, which can make things a -little- easier, however I would worry about how good/bad it would play with Unity/AOT type scenarios once netstandard merges (best simple practice for such a pipeline is to Task.Run each stage, not sure if that works in Unity.)


I'll admit, I've not looked at how JS publish works, based on what I know about the rest of parts of pipeline this may give some ideas to at least think about however.

(have gists of a few cases, although they are more of a SQL/read type thing...)

mtmk commented 4 months ago

@to11mtm so js publish is basically publish with a reply-to subject (inbox) which essentially collects ACKs. Current implementation achieves this by creating a temporary subscription on the mux inbox. So, we can make that more efficient by creating a single subscription and process 'batches' (which would be semaphores to avoid flooding the server with PUBs waiting for ACKs) concurrently. Flow wise we need task collecting ACKs, a loop on the current task to publish messages and another task to check ACKs for errors (or maybe we only need to track errors so the application can act on them).

mtmk commented 4 months ago

@anth-git fyi we have a PR to help with batching: