nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.73k stars 1.4k forks source link

JetStream message re-delivery after client disconnect #4627

Open crosem opened 1 year ago

crosem commented 1 year ago

What version were you using?

Not sure if this is a defect or if the behavior is by design.

I have a durable push worker Queue JetStream consumer using a subject group. I am using ExplicitAcks since it's critical that these messages are processed. Typically, these msgs are processed in under a second. However there are IO dependencies that may cause the processing time to surpass the default AckWait of 30secs. Rare but since these are trade messages they can only be processed once. I don't want the re-delivered because of a surpassed AckWait time. To handle this I upped the time to 10 mins.

However when running a client disconnect test case I noticed that even when the client that is consuming the message is taken down. The message is only re-delivered after the 10mins AckWait time.

For a trading system this is not acceptable. Users cannot wait 10 mins for the trade to process.

Does the NATS server not detect the connection disconnect and redeliver all of the connections pending messages immediately? This is what a past product I used, Tibco.EMS did.

Please advise if there is something I'm missing.

What environment was the server running in?

Windows 10 workstation

Is this defect reproducible?

Yea

Given the capability you are leveraging, describe your expectation?

Immediate re-delivery of message to other clients

Given the expectation, what is the defect you are observing?

Waits for the AckWait time

tehsphinx commented 3 weeks ago

We have a similar issue in a messaging system. If a device re-connects, all outstanding messages should be (re-)delivered immediately without having to wait for a timeout on messages that were already attempted to be delivered before.

Would be really appreciated if there was a way to reset all re-delivery timeouts for a consumer or to get all messages for that consumer (with the same ACK addresses!) in another way.

derekcollison commented 3 weeks ago

@crosem The system supports in progress signaling, like an ack, which resets the ackwait time for when the processing takes longer. So you can set the ackWait to lower based on business needs but have your application send periodic in-progress updates to the system.

derekcollison commented 3 weeks ago

@tehsphinx We do not have enough information to determine that. Think about a large global supercluster like Synadia Cloud, the consumer state could be in US west and your app could be in Japan let's say. If a pull consumer we know that we delivered the message to a given delivery subject, but have no clue outside of that if it made it or not, or the identity of the app trying to process.

I would suggest an approach similar to what I describce above to limit how long abandoned messages wait for an ack and use in progress if needed.

tehsphinx commented 2 weeks ago

@derekcollison Thank you for your answer!

Since I need to await an ACK from an external device and then I can ACK the message against JetStream, I already have a list of all messages I need an ACK for (in KV-Store with TTL). The key is an internal message ID, the value is the Reply address used for ACKing against JetStream.

I'm now "resetting" the timer when a device connects by:

  1. Opening the push subscription (and setting a lock wall so messages cannot be processed, until the lock is freed).
  2. Iterating over our KV-Store ACK messages (where the value is the Reply address for ACKing).
  3. Sending a NACK for each of them on the open subscription, effectively resetting the timer and resending them again immediately.

The downside of this approach is that the order is off since some of the messages were not "waiting" when the subscription was first opened. Order cannot be guaranteed anyway when repeating messages, but would have been nice to minimise the out of order messages with some kind of reset of all timeouts for a consumer.

But I get that it is most likely not an easy thing to do.

tehsphinx commented 2 weeks ago

@derekcollison Note on why your suggestion is hard to implement in our use case:

When receiving a message from JetStream, we send it out to a client application and register the Reply address in a NATS KV-Store with a TTL a bit longer than the JetStream resend timeout on that stream. Then we have no more routine that would be able to signal that we are still processing the message.

The incoming ACK from the client application is handled async in an independent thread. It retrieves the registered Reply address from the KV-Store, gets the correct subscription and uses that to create a *nats.Msg to be able to ACK the message against JetStream.

BTW: This use case does not seem possible any more with the newly created github.com/nats-io/nats.go/jetstream package as there is no way to get to the internals of the jetstream.Msg.