Closed gnarea closed 4 years ago
Adding more context just in case:
I have a server-side app that allows users of the app to retrieve messages coming from NATS Streaming. Each user would have their own channel (like channel-prefix.${userId}
).
Some users can keep a connection open indefinitely, so they'll be using WebSockets to get past messages as well as future messages in near real time. However, some users have a very short window of time to connect to the server, so they should only get whatever messages are available and then close the connection.
One thing I don't like about the timeout-based approach is that, theoretically, it could keep the connection open indefinitely if messages are spaced out.
@gnarea There's no mechanism to do this out of the box. But since the durable subscription will remember where you are, even if you close, you'll get any message you have not ACK'ed on a future session.
Internally this type of functionality would be handled by client/library in the same way - there's a timer that gets reset every time a message consumed by the handler. When the timer fires, you can call close() on the stan client.
With that said, another strat is to use the monitoring API built into the nats-streaming-server, and make an HTTP/S request and calculate how many messages are in the channel for the subscription, and prepare to receive at least so many messages, then close the client - new messages that were received before the monitoring request will process next time.
See https://docs.nats.io/nats-streaming-concepts/monitoring/endpoints#channelsz.
Thanks @aricart!
The monitoring API seems like the most reliable way to do it, but it's probably a bit of an overkill for what I need. It'd be great if I didn't have to make external calls.
I've just found out about the Message.getTimestamp()
method -- I guess I can use it along with timers to avoid the situation where spaced out messages keep the connection open indefinitely, right? That is, the subscription will be closed when one of these things happen:
Does my logic checks out? Presumably Message.getTimestamp()
is always the time when the message was published and never changes (not even if the message is being redelivered).
Having said this, even if the above works, one thing I do like about using the monitoring API is that we could close the subscription (and therefore the WebSockets connection with the client) as soon as all messages have been received.
@gnarea yes getTimestamp() is the message recorded by the streaming server on the message and will not change.
JetStream will enable you to make all these queries etc - using NATS directly. That is the next generation streaming platform.
Thanks @aricart! I'm looking forward to using Jetstream!
Hi,
I have a durable queue subscription where consumers may not stay around to get future messages -- They'd only connect opportunistically to get all pre-existing messages. How can I close the subscription automatically when all messages have been processed?
I'm thinking of using
setTimeout()
so that we disconnect after a period of inactivity, but is there a better way to achieve this without using timeouts?Cheers,
Gus.