nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.97k stars 1.41k forks source link

Consumer seems to be dropping messages [v2.10.20, v2.10.21] #5930

Open pcsegal opened 1 month ago

pcsegal commented 1 month ago

Observed behavior

This looks very much to be related to this older closed issue: https://github.com/nats-io/nats-server/issues/5705

The issue is very similar.

I have the exact same setup as the one described in that old issue, but will describe it again here for convenience. The full example is below under Steps to reproduce, and it has:

Note:

These streams are used as follows:

1) TEST_WORKQUEUE is initially populated with 130000 numbers. 2) Then, 100000 items are fetched from TEST_WORKQUEUE and published to TEST_ACTIVE_WORK_ITEMS. Each item in the active work items stream has an ID (the number gotten from the workqueue) and an expiry, which is set to + 30 seconds. 3) When publishing items to TEST_ACTIVE_WORK_ITEMS, we do it twice in a row; the second publish is meant to replace the previous one, bumping up that item ID's expiry time. Note that TEST_ACTIVE_WORK_ITEMS has a discard policy of DiscardOld to allow this behavior. 4) Once in the active work items stream, a consumer handles item expiration. NakWithDelay is used to schedule expiration. When an item expires, it's published back into TEST_WORKQUEUE and acked by the consumer. 5) We wait while the consumer is doing its work; meanwhile, we print statistics for monitoring the stream and consumer state.

So far, this is exactly the same as the example in the older issue linked above. The big difference is that here, once all the items are back into TEST_WORKQUEUE and the consumer's pending ack count goes down to zero, we repeat steps 2-5,

The issue starts happening when steps 2-5 are repeated. That is, the process appears to work fine the first time, but not on the second time.

The expected behavior is that, after the consumer's ack pending count goes down to 0, the size of TEST_WORKQUEUE should go back to 130000. So, we should see the following in the logs when the consumer no longer has any pending ack:

TEST_ACTIVE_WORK_ITEMS consumer ack pending 0
TEST_WORKQUEUE stream size 130000

In NATS 2.10.12, this seems to work without any issues.

However, in NATS >= 2.10.20, the consumer seems to drop items, and the final number of items going back to TEST_WORKQUEUE is often less than 130000, similarly to the old issue linked above. For example:

TEST_ACTIVE_WORK_ITEMS consumer ack pending 1
TEST_WORKQUEUE stream size 127855

(Note that, in the particular example above, the ack pending count finishes as 1, which seems to be another minor issue, but that seems rarer.)

Also, when steps 2-5 are repeated, we see a lot of timeout errors from NATS:

error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout

Expected behavior

The expected behavior is that, after the consumer's ack pending count goes down to 0, the size of TEST_WORKQUEUE should go back to 130000. So, we should see the following in the logs when the consumer no longer has any pending ack:

TEST_ACTIVE_WORK_ITEMS consumer ack pending 0
TEST_WORKQUEUE stream size 130000

Server and client version

Server: NATS 2.10.20 / 2.10.21-RC4. Client: NATS go client 1.37.0.

Host environment

Ubuntu 20.04.6 LTS.

Steps to reproduce

The script that reproduces this behavior is attached below. It expects NATS to be at 127.0.0.1:4222, so a local NATS just needs to be spun up with nats-server -js. The script will then delete/recreate the test streams (TEST_WORKQUEUE and TEST_ACTIVE_WORK_ITEMS), and the issue can be verified from the logs as described above.

nats-issue-20240925.zip

Jarema commented 1 month ago

Thanks for detailed report and reproduction example. We're looking into it.

Jarema commented 1 month ago

After checking and running your example, I found that it publishes messages to the Stream in uncontrolled fashion: yes, it is using js.Publish properly, but it spawns separate goroutine for every single publish, flooding the server with requests and causing issues. This is probably the source of timeouts you see.

After adding a semaphore for max 1k concurrent publishes (to change as little code as possible), the test seems to not loose messages, but rather have some other issue:

TEST_ACTIVE_WORK_ITEMS consumer ack pending 8
TEST_WORKQUEUE stream size 130000
TEST_ACTIVE_WORK_ITEMS consumer ack pending 8
TEST_WORKQUEUE stream size 130000
TEST_ACTIVE_WORK_ITEMS consumer ack pending 8
TEST_WORKQUEUE stream size 130000
pcsegal commented 1 month ago

Thank you for the feedback.

I agree I should avoid flooding it in a real scenario. To clarify, these issues were found in a load testing scenario.

I'm not using a semaphore, but I'm using a WaitGroup to constrain the load a little bit: I'm doing the publishing in batches of 20k at a time.

But, even if I do it like I did, I think there shouldn't be lost messages, even if there are timeouts, unless I'm misunderstanding something (or there is some mistake in my example), because:

Jarema commented 1 month ago

In your example I see 130k goroutines spawn in a loop, each doing a JetStream Publish. this can heavily impact the system, and I would assume - as you're not checking the messages ack & error of js.Publish, some messages might have failed and never got the the stream, while almost whole JetStream is botched. If you ran this test against main branch, you will see it failing explicitly (as we added rate limiting to protect the server).

Your assumptions are correct, however after adding semaphores, I see 130k messages after every test, with some pending acks, which means there is probably something else happening not right.

Hence my request to update the reproduction in a way that does not flood the server so much and check what are the results.

If you want quick publish, I suggest using js.PublishAsync in a normal loop. That will be much quicker than js.Publish, while respecting server capacity.

pcsegal commented 1 month ago

Thank you for the feedback.

I have updated the example with some improvements, based on your suggestion, and also to make it easier to run and see the issue.

Here is the new example: nats-issue-20240926.zip

The changes are:

1) When initially populating the workqueue, I'm rate limiting it to up to 1k concurrent goroutines. 2) In processWorkqueueItems (which fetches from the workqueue stream to publish to the active workitems stream), I'm rate limiting that to up to 5k concurrent goroutines. 3) I removed the part that resets the streams at the beginning of the example, as it was making it trickier to show the issue. Now, the script only creates the streams and populates TEST_WORKQUEUE if they don't already exist. 4) The script now only runs one round of processWorkqueueItems, and it should be manually invoked again if we want to rerun it.

Please use the following procedure to run this new version:

1) First, run it like this, to first delete the streams if they already exist so that it starts from a clean state:

nats stream del TEST_WORKQUEUE -f ; nats stream del TEST_ACTIVE_WORK_ITEMS -f ; go run main.go

The example will initially create the streams, and populate TEST_WORKQUEUE with 130000 items.

TEST_ACTIVE_WORK_ITEMS doesn't exist, creating
TEST_WORKQUEUE doesn't exist, creating                                                        
Populating TEST_WORKQUEUE stream
Finished populating TEST_WORKQUEUE 130000

It will then do the processing. This will normally finish with the correct result:

TEST_ACTIVE_WORK_ITEMS consumer ack pending 0
TEST_WORKQUEUE stream size 130000

2) Now, run it again, but without deleting the streams: go run main.go

Note that, since the streams already exist, it won't populate TEST_WORKQUEUE, it will reuse the items that are already there:

TEST_ACTIVE_WORK_ITEMS already exists                                                                                                                                                        
TEST_WORKQUEUE already exists                                                                 

Then, it does the processing. This time around, you should see timeout errors:

error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout
error acking msg from workqueue nats: timeout

Unfortunately, it takes a while (about 3 minutes and a half in this test), but eventually you should see something like:

TEST_ACTIVE_WORK_ITEMS consumer ack pending 17
TEST_WORKQUEUE stream size 83246
TEST_ACTIVE_WORK_ITEMS consumer ack pending 17
TEST_WORKQUEUE stream size 83246
TEST_ACTIVE_WORK_ITEMS consumer ack pending 17
TEST_WORKQUEUE stream size 83246

Note that this can be seen much more quickly by changing processWorkqueueItems to pause every 20000 items instead of every 5000 items (line 225).

The output above was obtained with the latest commit in main. It also happens with 2.10.20 / 2.10.21RC4. If I run with the older 2.10.12, there are no timeouts and no dropped events in the second run.

pcsegal commented 1 month ago

Hi, just to check, were you able to reproduce the issue on your side with this new example?

derekcollison commented 1 month ago

@Jarema any feedback for @pcsegal ?

pcsegal commented 1 month ago

Sorry to ask again, but I would like to know if there has been any update on this issue. Please, could you confirm that the new example reproduces the issue on your end?

Jarema commented 1 month ago

Hey. Sorry for the delay.

We were running your example against different versions of the server, and what we found out is that the change we did to the server in 2.10.16 affected how the server handles backpressure. Basically, from 2.10.16 onwards, the server is more suspectible to clients putting to much load on it.

When running your examples, I found that some errors are ignored (like js.Publish, which can be dropped if server is overloaded), while others are not retried in any way - like Fetch errors you see. The fetch timeout is happening because server is overloaded with JetStream requests.

Additionally, using wg.Wait here is suboptimal, as instead of constant, controlled flow of messags, you get batches of 5k operations, then a pause. Using a proper semaphore, or just a buffered channel would work much smoother.

When using such semaphore, I find that your example produces situations where consumer ack pending > 0 while TEST_WORKQUEUE stream is already 130k, and no redeliveries happen anyomore. This should not happen and we're investigating it.

We do not see consumer loosing messages - but rather js.Publish publishes being dropped due to too high load.

pcsegal commented 1 month ago

Hi, no problem.

Thank you for the feedback. I'm not sure if I understood where we are using js.Publish incorrectly, though. In the updated example I gave, we are checking all js.Publish errors and doing something if it returns an error, and, in all cases, it doesn't seem like silent drops should be occurring. For example, in populateWorkqueue, we panic:

            _, err := js.Publish(fmt.Sprintf("TEST_WORKQUEUE.%s", itemID), []byte(itemID))
            if err != nil {
                panic(err)
            }

After populating the workqueue in populateWorkqueue, we print the number of workqueue items right after populating it, and it always correctly shows 130000.

In startActiveWorkitemsConsumer, in line 154 (right after publishing to TEST_WORKQUEUE), the consumer just skips that item if the error gotten from js.Publish is anything other than maximum messages per subject exceeded; since we didn't ack the consumer message at that point, this act of skipping should let the consumer message be redelivered.

Also, in processWorkqueueItems, I think we also handle js.Publish errors correctly: in line 204, if we get an error when trying to publish the item to TEST_ACTIVE_WORK_ITEMS, we just return from the goroutine. As far as I understand, this should not result in any loss: since nothing else was done to the item that was just Fetched from the workqueue, I assume it should return to the workqueue eventually after AckWait,

Also, if I understand correctly, Fetch errors should not cause loss in this example, by the same reason I explained above: if I the fetched message was not acked, then it should return to the stream.

pcsegal commented 1 month ago

Just to clarify, when you say you are seeing js.Publish being dropped due to too high load, do you mean that they can be dropped even if the call to js.Publish doesn't return any error?

Jarema commented 1 month ago

Before we can check dropping messages in your example, we need to fix consumer getting stuck with outstanding acks which we found while running this workload.

Jarema commented 1 month ago

@pcsegal the first fix is landing https://github.com/nats-io/nats-server/pull/6003

Need to retest if that also fixed any dropped messages you see.

pcsegal commented 1 month ago

Thank you, I did some testing with that branch and this seems to have solved the number of oustanding acks issue. But I still see drops when running it a second time (similarly to before, I don't see drops when running it a first time from a clean state, but I see them when running it a second time without deleting the streams first).

Jarema commented 1 month ago

I will run some more tests with the pending issue fixed.

Jarema commented 1 month ago

@pcsegal

The errors you see are expected: after all, you're spawning 5k goroutines without any semaphore, where you fetch, publish twice and ack. Server will choke on such a flood.

Most importantly - I run the test dozen of times and it always passes for me.

pcsegal commented 1 month ago

Thank you for the feedback.

I made an attempt to modify this example to use a buffered channel and also lower the concurrency from 5000 to 2000: nats-issue-20241016.zip

However, I still see the issue on my side. In this case, I have to run it 4 times before the issue happens: that is, one initial time starting from a clean state, and then 3 additional times without deleting the streams.

If this is expected, then does that mean that, under heavy load, I should expect publishes to fail even if they didn't return an error?

pcsegal commented 1 month ago

I think I found a way to tweak the example in a way that this drops problem doesn't seem to happen. I changed this portion:

                    _, err = js.Publish(fmt.Sprintf("TEST_WORKQUEUE.%s", workItem.ID), []byte(workItem.ID))
                    switch {
                    case err == nil || strings.Contains(err.Error(), "maximum messages per subject exceeded"):
                        // Either the item has been published to TEST_WORKQUEUE or it already existed there,
                        // so we just need to ack this item.
                        if err = msg.AckSync(); err != nil {
                            return
                        }

to remove the check of whether the error is "maximum messages per subject exceeded", so I changed it to this:

                    _, err = js.Publish(fmt.Sprintf("TEST_WORKQUEUE.%s", workItem.ID), []byte(workItem.ID))
                    switch {
                    case err == nil:
                        if err = msg.AckSync(); err != nil {
                            return
                        }

The goal of that check was to prevent unnecessary retries in cases where the message already exists on TEST_WORKQUEUE for any reason.

I still need to investigate it in more detail.

pcsegal commented 1 month ago

I think I found what seems to be an explanation for the drops we see:

  1. processWorkqueueItems successfully publishes an item to TEST_ACTIVE_WORK_ITEMS and then tries to AckSync it from TEST_WORKQUEUE, but AckSync errors with a timeout. However, it seems that, even though the API call timed out, JetStream is still going to process the AckSync and eventually remove the message from the TEST_WORKQUEUE stream.

  2. About 20-30 seconds later, the active work items consumer tries to publish the same item back to the TEST_WORKQUEUE and gets a "maximum messages per subject exceeded" error. This error means the item already exists on the queue, so the consumer acks the consumer message. However, it seems the issue may be that the item is there only temporarily, and is soon going to be removed by AckSync as described in the previous paragraph. This is apparently why the item is lost.

This happens in the example with a 2000-sized buffered channel.

After adding some logs, this is an example filtered to show what happens with one specific ID (111386, in this case).

 2024-10-16 17:08:27 [processWorkqueueItems] fetched item with ID 111386 from workqueue
2024-10-16 17:08:27 [activeWorkItemsConsumer] received work item {111386 2024-10-16 17:08:57}
2024-10-16 17:08:27 [activeWorkItemsConsumer] rescheduling work item expiry {111386 2024-10-16 17:08:57}
2024-10-16 17:08:27 [activeWorkItemsConsumer] received work item {111386 2024-10-16 17:08:57}
2024-10-16 17:08:27 [activeWorkItemsConsumer] rescheduling work item expiry {111386 2024-10-16 17:08:57}
2024-10-16 17:08:32 [processWorkqueueItems] error acking msg from workqueue {111386 2024-10-16 17:08:57} nats: timeout
2024-10-16 17:08:59 [activeWorkItemsConsumer] received work item {111386 2024-10-16 17:08:57}
2024-10-16 17:08:59 [activeWorkItemsConsumer] error publishing to workqueue: maximum messages per subject exceeded {111386 2024-10-16 17:08:57}

As you can see we get a timeout error at 17:08:32, and then about 27 seconds later we get a "maximum messages per subject exceeded" when trying to publish it back to the stream.

When inspecting the stream with nats stream subjects TEST_WORKQUEUE, the item 111386 is not there.

In the above explanation, I am assuming that, even though AckSync fails with a time out, the ack (removal from the workqueue stream) still takes effect a few seconds later (more than 10 seconds).

If this assumption is correct, then I guess that this is expected behavior from JetStream under load, as you said.

But I would like to confirm: is this behavior expected even with the goroutines limited to 2k by a buffered channel?

Jarema commented 1 month ago

Your assumption is correct!

Few points here:

  1. In last few releases we optimized the server and cluster to better handle many clients putting a lot of pressure, in favor of one client. This should not affect most single-client workloads, unless they are putting a huge pressure, usually only relevant in benchmarks that do not reproduce real-world examples.
  2. Your example is not "just 2000 channel". It's almost like simulating work of 2000 clients over one connection.

You can complete processWorkqueueItems in less than two seconds(!) with zero goroutines. Look at that example:

// Fetch items from the workqueue and publish them to the active work items stream, removing them from the workqueue.
func processWorkqueueItems(js nats.JetStreamContext) {
    if _, err := js.AddConsumer("TEST_WORKQUEUE", &nats.ConsumerConfig{
        Durable:       "testworkqueueconsumer",
        AckPolicy:     nats.AckExplicitPolicy,
        MaxWaiting:    20000,
        MaxAckPending: -1,
    }); err != nil {
        panic(err)
    }

    workqueueSub, err := js.PullSubscribe("TEST_WORKQUEUE.>", "testworkqueueconsumer", nats.BindStream("TEST_WORKQUEUE"))
    if err != nil {
        panic(err)
    }

    NUM_MSGS := 100000
    FETCH_BATCH_SIZE := 200
    for i := 0; i < NUM_MSGS/FETCH_BATCH_SIZE; i++ {
        msgs, err := workqueueSub.FetchBatch(FETCH_BATCH_SIZE)
        if err != nil {
            fmt.Println("error fetching from workqueue", err)
            return
        }

        for msg := range msgs.Messages() {
            itemID, err := strconv.Atoi(string(msg.Data))
            if err != nil {
                panic(err)
            }
            // Publish item to TEST_ACTIVE_WORK_ITEMS stream with an expiry time.
            workItem := ActiveWorkItem{ID: string(msg.Data), Expiry: time.Now().Add(30 * time.Second)}
            data, err := json.Marshal(workItem)
            if err != nil {
                panic(err)
            }
            ack, err := js.PublishAsync(fmt.Sprintf("TEST_ACTIVE_WORK_ITEMS.%d", itemID), data)
            if err != nil {
                fmt.Println("error publishing to TEST_ACTIVE_WORK_ITEMS", workItem, err)
                return
            }
            // Update expiry time and republish item to TEST_ACTIVE_WORK_ITEMS stream.
            workItem.Expiry = time.Now().Add(30 * time.Second)
            data, err = json.Marshal(workItem)
            if err != nil {
                panic(err)
            }
            ack2, err := js.PublishAsync(fmt.Sprintf("TEST_ACTIVE_WORK_ITEMS.%d", itemID), data)
            if err != nil {
                fmt.Println("error publishing to TEST_ACTIVE_WORK_ITEMS", workItem, err)
                return
            }
            ack.Ok()
            ack2.Ok()
            // Ack item to remove it from TEST_WORKQUEUE.
            err = msg.Ack()
            if err != nil {
                fmt.Println("error acking msg from workqueue", err)
                return
            }
        }
    }
}

This is much faster, with fraction of IO used and verry little pressure on the server. And it can be probably optimized further. I did in in ugly way just to show you alterantive approach. You can for sure make it much better code.

In most cases when using NATS, there is not needed to spawn so many goroutines to be efficient. Quite opposite. Also, please consider moving to the new JetStream APII which has nice constructs that handle efficient messages delivery for pull consumers for you.

Let me know if I can help you further!

pcsegal commented 1 month ago

Thank you for the feedback and example.

Just one question about that example, did you mean to receive from the channels returned by Ok()?

<- ack.Ok()
<- ack2.Ok()

If I do that change, the total time increases somewhat (almost 1 minute versus about 5 seconds without the change above).