nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.59k stars 1.39k forks source link

Huge performance hit on stream with 3M unique subjects [v2.10.18] #5803

Open 7Kronos opened 1 month ago

7Kronos commented 1 month ago

Observed behavior

We have built a pipeline that, from various information sources, allows us to produce an entity whose data is composed on the fly.

Information for Stream PPSTREAM created 2024-08-17 18:27:32

              Subjects: pipeline.product.>
              Replicas: 1
               Storage: File

Options:

             Retention: Limits
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 2m0s
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: true

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: 1
         Maximum Bytes: 200 GiB
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

State:

              Messages: 3,359,000
                 Bytes: 14 GiB
        First Sequence: 114,332 @ 2024-08-18 13:51:10
         Last Sequence: 3,602,895 @ 2024-08-18 19:29:37
      Deleted Messages: 129,564
      Active Consumers: 2
    Number of Subjects: 3,359,000

The messages are stored in a stream with Per Subject Messages Limit 1 and incoming messages have this subject template product.{sku}.{source} This allows us to maintain in NATS the latest information from a source for a given product.

The pipeline subscribes to product.> to trigger the composition of a product sheet, which it stores in a KV bucket, a concept similar to an event store. During this phase, the pipeline must collect all the messages concerning a SKU product.{sku}.> To do this, an ephemeral consumer is created to collect these messages and then deleted :

        let msg: JsMsg | null = null

        while ((msg = await con.next()) !== null) {
            const source = msg.headers?.get('source')

            messages.push({
                source: source ?? 'unknown',
                data: unpack(Bun.gunzipSync(msg.data))
            })

            const info = await con.info()

            if (info.num_pending === 0) {
                break
            }
        }

        con.delete() // unawaited to save RTT

This is called each time for each message in the stream.

With a base of 50,000 messages, this phase takes an average of 7ms, which is acceptable at this stage.

image

However, during testing 3,500,000 messages this phase takes an average of 420ms, which represents a significant performance hit.

It is important to note that no publication occurs in parallel. The messages were loaded in advance before the test was triggered.

image

Disk I/O is pretty low image

Expected behavior

We except a smaller performance hit.

Server and client version

The server is standalone 2.10.18

[1] 2024/08/19 08:01:52.280840 [INF] Starting nats-server
[1] 2024/08/19 08:01:52.280935 [INF]   Version:  2.10.18
[1] 2024/08/19 08:01:52.280939 [INF]   Git:      [57d23ac]

Javascript client 2.28.2

nats@^2.28.2: 
  version "2.28.2"
  resolved "https://registry.npmjs.org/nats/-/nats-2.28.2.tgz"
  integrity sha512-02cvR8EPach+0BfVaQjPgsbPFn6uMjEQAuvXS2ppg8jiWEm2KYdfmeFmtshiU9b2+kFh3LSEKMEaIfRgk3K8tw==
  dependencies: 
    nkeys.js "1.1.0"

Host environment

Docker version 26.0.2 Volume driver : local

Steps to reproduce

No response

MauriceVanVeen commented 1 month ago

Each call to consumer.info() does an API request to fetch the latest info. This has quite a large overhead when done for every message and can be simplified by getting the pending count directly from the message metadata:

if (msg.info.pending === 0) {
    break
}

Could you check with that change?

7Kronos commented 1 month ago

@MauriceVanVeen

That's a good point, figured it out reading examples earlier and tried already.

consumer.info() was adding ~2 ms

7Kronos commented 1 month ago

I made a POC with rudimentary code replacing KV and JS with mongodb collections but with the same code base and some indexes.

I know the driver is optimized and enqueue changes but I added write concern to 1, so I have an ack from the MongoDB instance, effectively throttling to prevent the server from overwhelming.

I got these results : image

Tell me if I can help in any way.