nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.49k stars 1.38k forks source link

NATS JetStream >= 2.10.17 slows down under load in a specific situation #5702

Closed pcsegal closed 1 month ago

pcsegal commented 1 month ago

Observed behavior

In recent NATS JetStream versions (2.10.17, 2.10.18 and 2.11.0-dev), when I publish messages at a relatively high rate into a stream with WorkQueue retention policy, the stream operations start to slow down and time out after a while. When I test the same example against NATS 2.10.12, it runs to completion with no errors.

Expected behavior

I would expect the performance to be the same that I observe on NATS 2.10.12.

Server and client version

I tested the following server versions of NATS: 2.10.12, 2.10.17, 2.10.18 and the development version (2.11.0-dev) in the main branch.

In version 2.10.12, I observed normal performance. In all the other versions, I observed the performance degradation.

Go client (github.com/nats-io/nats.go): v1.36.0.

Host environment

OS: Ubuntu 22.04.1 LTS on WSL2 (Windows 10) CPU: Intel(R) Core(TM) i3-3220 CPU @ 3.30GHz RAM: 8.00 GB

Steps to reproduce

  1. Run nats-server -js with NATS server version 2.10.18.
  2. Execute the script whose listing is given below. You should see several lines of the type error fetching from workqueue nats: timeout and error acking msg from workqueue nats: timeout.
  3. Run nats-server -js with NATS server version 2.10.12.
  4. Execute the script again against the older NATS version. You should see the following output when the script is done executing:

    Finished populating work queue 100000
    Final stream size 100000

The script below creates a WorkQueue stream with 100000 items and a durable consumer that runs for a limited number of iterations where it continually 1) fetches the next item from the work queue, 2) acks that item and then 3) publishes that item back to the work queue. When all the processing is done, it prints the number of items in the stream, which should be 100000.

package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    var nc *nats.Conn
    var js nats.JetStreamContext
    nc, err := nats.Connect("127.0.0.1")
    if err != nil {
        panic(err)
    }
    defer nc.Close()
    if js, err = nc.JetStream(); err != nil {
        panic(err)
    }

    // Delete and recreate test streams / buckets.

    _ = js.DeleteStream("TEST_WORKQUEUE")

    _, err = js.AddStream(&nats.StreamConfig{
        Name:                 "TEST_WORKQUEUE",
        Discard:              nats.DiscardNew,
        MaxMsgsPerSubject:    1,
        DiscardNewPerSubject: true,
        Retention:            nats.WorkQueuePolicy,
        Subjects:             []string{"TEST_WORKQUEUE.>"},
    })
    if err != nil {
        panic(err)
    }

    // Populate work queue

    NUM_MSGS := 100000

    var wg sync.WaitGroup

    for i := 0; i < NUM_MSGS; i++ {
        wg.Add(1)
        i := i
        go func() {
            defer wg.Done()
            itemID := fmt.Sprintf("%d", i)
            js.Publish(fmt.Sprintf("TEST_WORKQUEUE.%s", itemID), []byte(itemID))
        }()
    }
    wg.Wait()

    streamInfo, err := js.StreamInfo("TEST_WORKQUEUE")
    if err != nil {
        panic(err)
    }

    fmt.Println("Finished populating work queue", streamInfo.State.Msgs)

    // Create consumer that runs a loop where it
    // 1) fetches the next item from the work queue,
    // 2) acks that item,
    // 3) and then publishes that item back to the work queue.
    // The outer loop of the consumer runs for 7 iterations, each one processing a batch of 20000 items.

    if _, err = js.AddConsumer("TEST_WORKQUEUE", &nats.ConsumerConfig{
        Durable:       "testworkqueueconsumer",
        AckPolicy:     nats.AckExplicitPolicy,
        MaxWaiting:    20000,
        MaxAckPending: -1,
    }); err != nil {
        panic(err)
    }

    workqueueSub, err := js.PullSubscribe("TEST_WORKQUEUE.>", "testworkqueueconsumer", nats.BindStream("TEST_WORKQUEUE"))
    if err != nil {
        panic(err)
    }

    for j := 0; j < 7; j++ {
        NUM_ITEMS := 20000
        for i := 0; i < NUM_ITEMS; i++ {
            wg.Add(1)
            i := i
            go func() {
                defer wg.Done()
                msgs, err := workqueueSub.Fetch(1)
                if err != nil {
                    fmt.Println("error fetching from workqueue", err)
                    return
                }
                for {
                    err = msgs[0].AckSync()
                    if err != nil {
                        fmt.Println("error acking msg from workqueue", err)
                        continue
                    }
                    break
                }
                itemID, err := strconv.Atoi(string(msgs[0].Data))
                if err != nil {
                    panic(err)
                }
                for {
                    _, err = js.Publish(fmt.Sprintf("TEST_WORKQUEUE.%d", itemID), msgs[0].Data)
                    if err != nil {
                        fmt.Println("error publishing msg back to workqueue", err)
                        continue
                    }
                    return
                }
            }()
            if i%1000 == 0 {
                time.Sleep(100 * time.Millisecond)
            }
        }
        wg.Wait()
    }
    wg.Wait()

    fmt.Println("Final stream size", streamInfo.State.Msgs)

}
kozlovic commented 1 month ago

@derekcollison I had a look at this and to me, it seems that we are doing way too many compacts. I tracked it down to a fix that was made here. Before this fix, the new buffer would shrink quite a bit because we would omit the tombstones, but it is no longer the case.

If I understand correctly, what happens is that when the first sequence of a block that is not empty is removed (due to an ack) then we write a tombstone at the end of the block. But what seem to be happening is that after a while with the provided test above, we end-up doing a compact for every single message.

What I have noticed is that we exclude the accounting when writing a tombstone, as shown here, but if I do a:

} else {
   mb.rbytes += rl
}

Then it looks like the compact is no longer happening and the test complete. Of course, I don't know if that is correct and if this would not introduce new issues.

Note: running the provided test, something that is really strange, is that even with the "fix" that eliminates the too frequent compacts, we get this error message from the client:

error publishing msg back to workqueue nats: maximum messages per subject exceeded

when pointing to main, but not from 2.10.12, so there must be something else. What is strange is that the test program takes a message from the work queue (there are 100,000 distinct subjects), ack the message (using the client "AckSync()" method, and only if successful sends to that same subject. So there should never be this message since the message should have been removed, then put back. So the error condition should not happen.

kozlovic commented 1 month ago

Oh, regarding the last comment, I see that the app is setting:

        Discard:              nats.DiscardNew,
        MaxMsgsPerSubject:    1,
        DiscardNewPerSubject: true,

which hits this line. Not sure if this means that this is a config error. Checking...

kozlovic commented 1 month ago

@derekcollison It is actually a problem. The server sends the reply back to the client before actually removing the message. This call here is the one sending the ack back, while this call down here is the one that is going to remove it from the state. This is why sometimes we get the new version of the message arrives before the previous has been removed.

kozlovic commented 1 month ago

@derekcollison @wallyqs The change in behavior (sending the ack prematurely) was introduced in this PR: https://github.com/nats-io/nats-server/pull/5419. (I find it hard to figure out what versions are impacted by a given PR/commit since we changed the way we do releases).

derekcollison commented 1 month ago

Thanks will take a look tomorrow.

derekcollison commented 1 month ago

The ack send is for single server mode or non-clustered asset correct?

kozlovic commented 1 month ago

@derekcollison Correct. This test (and investigation) was using a single NATS Server.

kozlovic commented 1 month ago

@derekcollison The PR closing this issue resolved only the compact issue, not the premature ACK sent by the server, correct?

derekcollison commented 1 month ago

Correct. This should remain open.

kozlovic commented 1 month ago

Re-opening waiting for the premature ack fix.

pcsegal commented 1 month ago

Thank you, the slow down itself seems fixed. Just checking if there is any estimate for the remainder of this issue (premature ack).

derekcollison commented 1 month ago

This only affects single server AFAIK, so on our list but not a top priority atm.