nats-io / nats.net

Full Async C# / .NET client for NATS
https://nats-io.github.io/nats.net
Apache License 2.0
262 stars 54 forks source link

Producing to JetStreams with high level of batching causes errors #523

Closed AntonSmolkov closed 4 months ago

AntonSmolkov commented 4 months ago

Observed behavior

Continuing the discussion https://github.com/nats-io/nats-server/discussions/5551

Producing with high level of batching causes errors and cancellation of producing clients.

Client logs ``` [19-06-2024 11:07:00.632 Warning (NATS.Client.Core.Internal.InboxSubBuilder)] Unregistered message inbox received for _INBOX.aBMZ5Rir6zAqsWM6wbFVAv.KaQUl3GtSDGskW9CnOIaHx [19-06-2024 11:07:00.632 Warning (NATS.Client.Core.Internal.InboxSubBuilder)] Unregistered message inbox received for _INBOX.aBMZ5Rir6zAqsWM6wbFVAv.KaQUl3GtSDGskW9CnOIaMo [19-06-2024 11:07:00.632 Warning (NATS.Client.Core.Internal.InboxSubBuilder)] Unregistered message inbox received for _INBOX.aBMZ5Rir6zAqsWM6wbFVAv.KaQUl3GtSDGskW9CnOIaRf [19-06-2024 11:07:00.632 Warning (NATS.Client.Core.Internal.InboxSubBuilder)] Unregistered message inbox received for _INBOX.aBMZ5Rir6zAqsWM6wbFVAv.KaQUl3GtSDGskW9CnOIaWW [19-06-2024 11:07:00.632 Warning (NATS.Client.Core.Internal.InboxSubBuilder)] Unregistered message inbox received for _INBOX.aBMZ5Rir6zAqsWM6wbFVAv.KaQUl3GtSDGskW9CnOIabN [19-06-2024 11:07:00.685 Error (Zyfra.Udl.Tools.NatsPerfTester.Producer.ProducerWorker)] Producer 82. An error has occured while generating and publishing message System.OperationCanceledException: The operation was canceled. at NATS.Client.Core.Commands.CommandWriter.PublishStateMachineAsync(Boolean lockHeld, String subject, String replyTo, NatsPooledBufferWriter`1 headersBuffer, NatsPooledBufferWriter`1 payloadBuffer, CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder`1.StateMachineBox`1.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token) at NATS.Client.Core.NatsConnection.RequestSubAsync[TRequest,TReply](String subject, TRequest data, NatsHeaders headers, INatsSerialize`1 requestSerializer, INatsDeserialize`1 replySerializer, NatsPubOpts requestOpts, NatsSubOpts replyOpts, CancellationToken cancellationToken) at NATS.Client.JetStream.NatsJSContext.PublishAsync[T](String subject, T data, INatsSerialize`1 serializer, NatsJSPubOpts opts, NatsHeaders headers, CancellationToken cancellationToken) at Zyfra.Udl.Tools.NatsPerfTester.Producer.Publishers.JsNatsPublisher.PublishAsync[T](String subject, T data, NatsHeaders headers, String replyTo, INatsSerialize`1 serializer, NatsJSPubOpts opts, CancellationToken cancellationToken) in /project/Zyfra.Udl.Tools.NatsPerfTester/Producer/Publishers/JsNatsPublisher.cs:line 16 at Zyfra.Udl.Tools.NatsPerfTester.Producer.ProducerWorker.GenerateAndPublishMessagesAsync(Int32 producerIndex, CancellationToken cancellationToken) in /project/Zyfra.Udl.Tools.NatsPerfTester/Producer/ProducerWorker.cs:line 166 [19-06-2024 11:07:01.027 Error (Zyfra.Udl.Tools.NatsPerfTester.Producer.ProducerWorker)] Producer 1. An error has occured while generating and publishing message System.OperationCanceledException: The operation was canceled. at NATS.Client.Core.Commands.CommandWriter.PublishStateMachineAsync(Boolean lockHeld, String subject, String replyTo, NatsPooledBufferWriter`1 headersBuffer, NatsPooledBufferWriter`1 payloadBuffer, CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder`1.StateMachineBox`1.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token) at NATS.Client.Core.NatsConnection.RequestSubAsync[TRequest,TReply](String subject, TRequest data, NatsHeaders headers, INatsSerialize`1 requestSerializer, INatsDeserialize`1 replySerializer, NatsPubOpts requestOpts, NatsSubOpts replyOpts, CancellationToken cancellationToken) at NATS.Client.JetStream.NatsJSContext.PublishAsync[T](String subject, T data, INatsSerialize`1 serializer, NatsJSPubOpts opts, NatsHeaders headers, CancellationToken cancellationToken) at Zyfra.Udl.Tools.NatsPerfTester.Producer.Publishers.JsNatsPublisher.PublishAsync[T](String subject, T data, NatsHeaders headers, String replyTo, INatsSerialize`1 serializer, NatsJSPubOpts opts, CancellationToken cancellationToken) in /project/Zyfra.Udl.Tools.NatsPerfTester/Producer/Publishers/JsNatsPublisher.cs:line 16 at Zyfra.Udl.Tools.NatsPerfTester.Producer.ProducerWorker.GenerateAndPublishMessagesAsync(Int32 producerIndex, CancellationToken cancellationToken) in /project/Zyfra.Udl.Tools.NatsPerfTester/Producer/ProducerWorker.cs:line 166 ```
Brokers logs ``` [6] 2024/06/19 11:05:42.817547 [INF] JetStream cluster new stream leader for 'MAIN > udl-js-partition-121' [6] 2024/06/19 11:05:43.041990 [INF] JetStream cluster new stream leader for 'MAIN > udl-js-partition-102' [6] 2024/06/19 11:05:43.621177 [WRN] JetStream cluster stream 'MAIN > udl-js-partition-56' has NO quorum, stalled [6] 2024/06/19 11:05:44.595923 [WRN] JetStream cluster stream 'MAIN > udl-js-partition-33' has NO quorum, stalled [6] 2024/06/19 11:05:44.596833 [WRN] JetStream cluster stream 'MAIN > udl-js-partition-116' has NO quorum, stalled [6] 2024/06/19 11:05:45.463697 [WRN] JetStream cluster stream 'MAIN > udl-js-partition-3' has NO quorum, stalled [6] 2024/06/19 11:05:46.366880 [WRN] JetStream cluster stream 'MAIN > udl-js-partition-88' has NO quorum, stalled [6] 2024/06/19 11:05:47.632334 [INF] JetStream cluster new stream leader for 'MAIN > udl-js-partition-83' [6] 2024/06/19 11:05:47.834469 [WRN] JetStream cluster stream 'MAIN > udl-js-partition-9' has NO quorum, stalled [6] 2024/06/19 11:05:48.956580 [INF] JetStream cluster new stream leader for 'MAIN > udl-js-partition-55' [6] 2024/06/19 11:05:49.175085 [WRN] JetStream cluster stream 'MAIN > udl-js-partition-8' has NO quorum, stalled ```

Repro code and instructions are here. https://github.com/AntonSmolkov/NATS-JetStream-Perf/tree/main

Repro settings for the app:

        - name: NATS_PRODUCERS_PARALLEL 
          value: "1280" #less might be enought
        - name: NATS_PRODUCERS_BATCH_SIZE
          value: "1000"
        - name: NATS_JETSTREAMS_PARTITIONS_COUNT
          value: "128"
        - name: NATS_JETSTREAMS_CREATE
          value: "true"
        - name: NATS_JETSTREAMS_COMPRESSION_ENABLED
          value: "false"
        - name: NATS_JETSTREAMS_USE_IN_MEMORY_STORAGE
          value: "true"

Without batching (NATS_PRODUCERS_BATCH_SIZE=1) sample code works fine. Also, no cancelation is fired from the sample code and sample apps own cancellation token exception is suppressed

Expected behavior

No errors when using batching

Server and client version

Client library - 2.2.3 Server - 2.10.16

Host environment

Envinronment:

Also TLS is used

Broker configuration can be reproduced with helm values from this repo - https://github.com/AntonSmolkov/NATS-JetStream-Perf/tree/main

Steps to reproduce

  1. Configure multi-broker NATS setup
  2. Run code from sample repository (1k batches, 1280 parallel producers, each with its own connection)

(Probably should be reproducible with smaller setup and less parallel producers)

mtmk commented 4 months ago

thanks for the report @AntonSmolkov, sorry for the late reply. it looks like you forgot to await at the end of ExecuteAsync(). something like await Task.WhenAll(tasks) should solve the issue. Good project btw, please keep us posted 💯

AntonSmolkov commented 4 months ago

@mtmk you mean this line?

ExecuteAsync() is a method of dotnet background service, so it shouldn’t make any difference if it awaits spawned tasks or not. They will work in the background in any case.

Moreover, with NATS_PRODUCERS_BATCH_SIZE=1 the issue doesn’t reproduce

mtmk commented 4 months ago

OK, true. I added them to my copy anyway, at least for clean shutdown. But I'm seeing the issue either way. problem seems to be my suggestion of (ab)using the tasks trying to batch the JetStream publish. works fine with core and as you said with batch=1 which is using js.publish as intended.

mtmk commented 4 months ago

@AntonSmolkov fyi we have a PR to help with batching:

mtmk commented 4 months ago

@AntonSmolkov fyi concurrent JetStream publishing is now released in 2.3.0.

var batch = 100;
var futures = new NatsJSPublishConcurrentFuture[batch];

for (var i = 0; i < batch; i++)
{
    futures[i] = await _js.PublishConcurrentAsync("foo", i);
}

for (var i = 0; i < batch; i++)
{
    await using var future = futures[i];
    var ack = await future.GetResponseAsync();
    ack.EnsureSuccess();
}