Closed mipnw closed 2 years ago
@mipnw Unfortunately, this is because you are acknowledging the message as soon as you enter your callback that may take a long time to complete.
Think about it: yes, you specify MaxInflight(1), but as soon as the message is ack'ed, then the server can send the next message.
To simplify things: suppose that there are 5 messages in the channel and a single queue subscriber. The app receives message 1 and ack it right away, which means that the server sends message 2 to the client library.
Your app is still processing message 1, and will do so for, say 20 minutes. Well, message 2 has been sitting in the client library queue, waiting for the user callback (your app) to finish. Since you use the default AckWait of 30 seconds, the server will redeliver message number 2: 40 times (20 min (1200 sec) / AckWait (30 sec)).
When your application is finally done with message 1, message 2 is presented to the callback and ack'ed right away. Then server sends message 3 and, once again, the app spends several minutes processing message 2. The server will once again redeliver message 3 until it gets an ack from it, which won't happen until you are done with processing of message 2.
Note that those redelivered messages that have been accumulating in the client library are not suppressed by the library once your application has finally get to the point of ack'ing it. This is why in the example above your would see message 2 presented at a rapid succession many times to your callback. If your callback detects it has a duplicate, it still should acknowledge the message.
I would expect any message would be delivered exactly once. What is going on?
I hope the above clarifies it. But keep in mind that you cannot get exactly once (even in more typical situations where you would process the message, then ack it, even with maxInflight(1)). NATS is at-least-once. There are several things at play here: AckWait that may cause redeliveries, and the fact that there is always the possibility that the Ack is lost, or the server crashes just before processing the Ack. Moreover, the Ack is a one-way protocol, that is, there is no confirmation on the client side that the Ack has been processed.
So for your current situation, I would recommend that instead of Ack'ing the message at the top of the callback you simply make sure that you use an AckWait() (in addition to MaxInflight(1)) that covers more than what a callback can last. Even then, I want to make you aware that this is not an absolute resolution, because there will be cases where the server may not honor MaxInflight() for queues, and that is when a queue member has a message and closes before it is acknowledged. The message need to be reassigned to an existing member, even if that means that that would be over the MaxInflight.
Finally, note that NATS Streaming has been deprecated (see notice that was posted a while back: https://github.com/nats-io/nats-streaming-server#warning--deprecation-notice-warning), so if this is a new development, I would recommend that you look into NATS JetStream instead that has features (such as NAck, InProgress() (to delay redelivery of a given message), etc..) that NATS Streaming does not have.
Hi @kozlovic,
Thank you for your answer.
Yes I am aware of NATS JetStream and this is not new development so that new solution is not an option. I am also aware that NATS guarantees at-least once, not exactly once.
Under "easy" conditions, where a NATS server did not crash, where a NATS subscriber of a queue group did not disconnect, the subscriber receives 970 deliveries, instead of approximately one delivery. Approximately one would be 1 most of the time, sometimes 2, not 970 every time. So this isn't about guarantees, it's about not using NATS adequately, and it seems to me, there is no configuration of NATS that actually would yield the desirable behavior, though I'm not 100% sure about that.
This application is actually tolerant to duplicates. It logs an error on a duplicate so we can be informed of those events. In general we should not be receiving duplicates and no error would be logged. When a NATS server reboots (infrequent), or a NATS subscriber reboots or disconnects (infrequent), we'd expect an error in the logs and we could quickly dismiss that error as being reasonable given the earlier of reboot/disconnect events. This duplicate error would be seen with a frequency that matches how often we see reboots/disconnects. I.e. not often.
The fact that we log an error 970 times here, because a message was redelivered 970 times, yet no server crashed and no subscriber disconnected, that tells me something is wrong.
I should clarify this application, when it receives a duplicate message, does not take 30 minutes to handle that case, it takes a few millisecond (it acks, logs an error, and returns without any handling).
Based on your answer, I understand the following is happening with the Ack at the start of the message handler. The subscriber gets message M1 from NATS. A millisecond later it acks, which triggers the reception of the next message M2. M2 will not be acked because the handler is busy handling M1 for 30 minutes. M2 will therefore be redelivered 970 times. When the handler is done with M1, it will receives 970 redeliveries of M2 and log an error for each.
That explains message #6 in my original post having been delivered 970 times.
Suppose the subscriber were to AutoAck instead of manual Ack. A very similar history of redeliveries would occur, M1 not being Ack'ed for 30 minutes, would be redelivered many times over that 30 minutes, and the message handler would be dealing with 970 redeliveries of M1 instead of redeliveries of M2. Hence neither Manual Ack, nor AutoAck solves this problem.
You mention setting AckWait to ~31 minutes and using AutoAck. I assume this specific problem of 970 redeliveries would be solved, but another problem would creep in. In case a subscriber crashes between reception of a message and before ack'ing, no other subscriber would receive that message for 31 minutes. That would be unacceptable.
Is there no way to achieve a redelivery within a few seconds if a subscriber does not Ack, i.e. is there no way to do the following: a subscriber manually acks early in the handler to inform the server, "i got this, you don't need to redeliver this to anyone else in the queue group", but that would not trigger the "send me 1 more message". When MaxInFlight=1 that should be happening, not in the background upon Acking, but when the handler returns, such that there is exactly 1 message on that subscriber, not 1 message in handling + 1 message in the client library's internal queue, which is 2 messages in flight, not 1?
Yes I am aware of NATS JetStream and this is not new development so that new solution is not an option. I am also aware that NATS guarantees at-least once, not exactly once.
Since you stated "exactly once" in your original post, I just wanted to let you know that designing a solution with that expectation would lead to some issues. As for the behavior that you observed, it has always been the case, so this is why I suspected that this was a new application (because otherwise you should have been seeing this behavior earlier), hence my comment about the deprecated notice.
However, read on to a possible solution:
Is there no way to achieve a redelivery within a few seconds if a subscriber does not Ack, i.e. is there no way to do the following: a subscriber manually acks early in the handler to inform the server, "i got this, you don't need to redeliver this to anyone else in the queue group", but that would not trigger the "send me 1 more message"
No, as soon as the server receives the Ack, and if the outstanding number of ack falls below the MaxInflight, then the server will send a new message.
When MaxInFlight=1 that should be happening, not in the background upon Acking, but when the handler returns, such that there is exactly 1 message on that subscriber, not 1 message in handling + 1 message in the client library's internal queue, which is 2 messages in flight, not 1?
No, again, the server does not know from which location in your code you have sent the Ack. To make sure that there is only 1 message callback in progress, you would need to ack at the end of the callback, or user auto-ack which means that the library will send the Ack after the callback returns.
You mention setting AckWait to ~31 minutes and using AutoAck. I assume this specific problem of 970 redeliveries would be solved, but another problem would creep in. In case a subscriber crashes between reception of a message and before ack'ing, no other subscriber would receive that message for 31 minutes. That would be unacceptable.
That's where I think we could make things mainly work. If you set your AckWait to ~31 minutes and modify the server(s) configuration to have a lower server-to-client heartbeat interval/fail count (see details below), and you use auto-ack (or ack just before the callback returns) then I believe that it will be close to what you want.
Suppose the channel has 5 messages and you have 2 queue subscribers from the same group running. First receives message 1 and second receives message 2. Processing those messages take less than 30 minutes, but quite some time. Suppose that subscriber 1 crashes, and shortly after is restarted. The server will likely sent it message 3. But as soon as the server detects the lack of heartbeat from the first subscriber that crashed, it will make the message available for redelivery. Now assume that this message is sent to subscriber 2. Subscriber 2 is still processing message 2. Based on the timing it is possible that redelivered message 1 is again redelivered to one of the subscriber, but that is likely to be no more than that, because then this message would be processed and ack'ed. Your application detects duplicate, so even if it was in the client library queue, it will quickly be acknowledged.
Note also that instead of "crashing", if the application closes the queue subscription, then the in-progress message is immediately assigned for redelivery, as opposed to the crash that needs to have the server detects the heartbeat failure, so can take a bit more time based on your server-client heartbeat settings.
For heartbeats, look for hb_interval, hb_timeout and hb_fail_count here. But an example would be:
hb_interval: "1s"
hb_timeout: "1s"
hf_fail_count: 3
which should make it pretty quick for a server to detect the crashed application and assign the non-ack'ed message for redelivery (those settings may be even a bit too low depending on the number of clients you have in your system).
Let me know if that works for you.
ok i'll look into these configs. Thank you.
I'm trying to understand NATS-streaming behavior, and I can't tell if what is happening is by-design (the result of the wrong subscription options), or a bug. At first, this seems like a bug to me.
// A handler that can take up to 30 minutes to process one message func(msg *stan.Msg) { // we ack immediately so as not to receive this message ever again err = msg.Ack() if err != nil { // error handling return } // message handling }
{"severity":"debug","timestamp":"2022-05-19T23:09:20.136Z","msg":"Message from NATS","Message":"sequence:6 subject:\"sign-requests-2\" data:\"\n$697f453f-cbdf-4488-8273-6caa07eaea68\" timestamp:1652951016167904979 redelivered:true redeliveryCount:968 "} {"severity":"debug","timestamp":"2022-05-19T23:09:50.137Z","msg":"Message from NATS","Message":"sequence:6 subject:\"sign-requests-2\" data:\"\n$697f453f-cbdf-4488-8273-6caa07eaea68\" timestamp:1652951016167904979 redelivered:true redeliveryCount:969 "} {"severity":"debug","timestamp":"2022-05-19T23:10:20.138Z","msg":"Message from NATS","Message":"sequence:6 subject:\"sign-requests-2\" data:\"\n$697f453f-cbdf-4488-8273-6caa07eaea68\" timestamp:1652951016167904979 redelivered:true redeliveryCount:970 "}
{"severity":"debug","timestamp":"2022-05-19T23:15:54.916Z","msg":"Message from NATS","Message":"sequence:7 subject:\"sign-requests-2\" data:\"\n$df166cfe-29a7-428f-bdec-24e951986506\" timestamp:1652951841022072144 "} {"severity":"debug","timestamp":"2022-05-19T23:15:54.936Z","msg":"Message from NATS","Message":"sequence:8 subject:\"sign-requests-2\" data:\"\n$9e3bd5f4-4d30-4b8a-b33a-f91f462ff128\" timestamp:1652954809957718131 "}
{"severity":"debug","timestamp":"2022-05-19T23:16:10.847Z","msg":"Message from NATS","Message":"sequence:6 subject:\"sign-requests-2\" data:\"\n$697f453f-cbdf-4488-8273-6caa07eaea68\" timestamp:1652951016167904979 redelivered:true redeliveryCount:1 "}