Closed vishnureddy17 closed 12 months ago
Session state is inherently coupled with packet identifier allocation and tracking
The implementation in the v3 client works OK; this offers a claimID function that allows a specific ID to be allocated prior to connection (so the store is opened, and any needed ID's identified, and claimed, before the connection is made). I would like to see Request changed to allocate sequential ID's (currently it always allocates the first available ID starting from 1; this complicates message tracing (because you get the same ID's over and over again) and there is a higher probability of conflicts due to ID reuse; I made this change in the v3 client a while back). UPDATE: This comment is outdated (MIDService
does retain a lastMid
and use it avoid frequently reusing ids) - change was a few years back :-).
It is difficult to stay spec-compliant with packet timeouts
I think there are two linked questions here:
What happens when the connection drops
When operating at QOS1+ published messages should be saved in the session state and retried when the connection is re-established (subject to Clean Start and Session Expiry). With the current design Publish
really needs to return when the connection drops (in paho
, but not necessarily autopaho
because that manages reconnection). This means that Publish
should return an error that's not really an error (but an indication that the message is queued and will be delivered later).
I think that this means that rather than "changing the existing context.WithTimeout()s to context.WithCancel()" it may be better to loop through the message IDs and send something to all channels indicating that the connection has been lost (Publish
/Subscribe
etc) will already be waiting on this channel. These handlers will then need to be re-established when the connection is brought up again (so some refactoring is needed!) so they can handle acknowledgments.
Secondly, "what happens if the context times out"
As we have sent a request packet the Publish
function cannot just return, because we still need to handle the responses. Currently if the context
expires we free the ID (defer
) and any further responses will be dropped here (as the message ID has been freed it is likely to be reused which is not good!).
I think that this means that, in the event of a timeout, Publish
needs to pass off handling the responses to a go-routine and then exit (meeting the contract with the caller, i.e. exit when context cancelled, but still adhering to the MQTT spec and completing the transaction).
A similar situation exists for Subscribe
/Unsubscribe
; the difference being that they do not form part of the session state so if the connection drops after we send the packet then we have no way of knowing whether the broker has processed it. We should not free the ID just because the context has expired (mainly because we should not be reusing the ID until we get a response).
The implementation in the v3 client works OK; this offers a claimID function that allows a specific ID to be allocated prior to connection (so the store is opened, and any needed ID's identified, and claimed, before the connection is made). I would like to see Request changed to allocate sequential ID's (currently it always allocates the first available ID starting from 1; this complicates message tracing (because you get the same ID's over and over again) and there is a higher probability of conflicts due to ID reuse; I made this change in the v3 client a while back)
I agree on everything here.
This means that Publish should return an error that's not really an error (but an indication that the message is queued and will be delivered later).
I think that this means that, in the event of a timeout, Publish needs to pass off handling the responses to a go-routine and then exit
Regarding Publish/Subscribe/Unsubscribe exiting in the event of a timeout or a connection drop: Do you have any thoughts on what the user surface would look like to get the ACKs later (either in the case of the ACK coming in past the timeout or after a retry when connecting on an existing session)?
resp, err := c.Publish(ctx, pub)
if err != nil {
// The publish timed out or the connection got dropped. How do we get the response later?
}
A similar situation exists for Subscribe/Unsubscribe; the difference being that they do not form part of the session state so if the connection drops after we send the packet then we have no way of knowing whether the broker has processed it.
I've been struggling to understand what to do in this situation; it seems unclear in the MQTT 5 spec. It seems like this opens up the possibility of leaking packet IDs if the connection drops when there are un-acked SUBSCRIBEs and UNSUBSCRIBEs. I see two ways of dealing with it:
Regarding Publish/Subscribe/Unsubscribe exiting in the event of a timeout or a connection drop: Do you have any thoughts on what the user surface would look like to get the ACKs later (either in the case of the ACK coming in past the timeout or after a retry when connecting on an existing session)?
This is tricky; the v3 client just notifies when the connection drops (working on the assumption that it will be delivered at some time in the future). I think that this is an advanced use-case; you need to remember that it's possible that the whole application may be restarted between Publish
being called and the message actually sent. I think it would make sense for the error to incorporate the message ID (custom error struct) and then provide the user with a way to override the routine that handles loading/processing messages from the store (this would be a bit of work). I'm not sure that this is needed from day 1 (users should be able to trust that a message will be delivered if at all possible, subject to the various options).
if the connection drops when there are un-acked SUBSCRIBEs and UNSUBSCRIBEs.
From the spec:
The Session State in the Client consists of:
- QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
- QoS 2 messages which have been received from the Server, but have not been completely acknowledged.
So you don't have to do anything (the message ID is freed when the connection drops). The broker will never respond to a SUBSCRIBE/PUBLISH after the link is dropped. It's left to the user to guess whether the subscription is still active (I design my topics to avoid this situation and always resubscribe when a connection is established). We could have an option to resend SUBSCRIBE/UNSUBSCRIBE but I think that would fit into autopaho
rather than paho
(and is something I'd implement later, if at all).
It looks like the keep-alive mechanism is not wired up to the client and instead just logs an error when a PINGRESP doesn't come back in time.
When a ping resp isn't received in time the default Pinger invokes the pingFailHandler, and the default instantiation of the client calls the internal error function as the pingFailHandler which will close the connection and shut down the client.
On the other topics being discussed I think @MattBrittan 's suggestion that Publish returns a sort of error is probably best. The v3 defaultly asynchronous calls and returning tokens wasn't very Go but it did make it easier to handle this particular problem. I'm thinking, as suggested, a more broader session state management system where in the case of a disconnection for a persistent session in flight Publishes return an error and the message id that was in flight. the mids are recorded in session management with a callback for priorSessionPublishCompleted or similar. This would only trigger for orphaned Publish calls not any Publish call (because this is supposed to be an edge case of normal use).
When a ping resp isn't received in time the default Pinger invokes the pingFailHandler, and the default instantiation of the client calls the internal error function as the pingFailHandler which will close the connection and shut down the client.
You're right, don't know what I was thinking there 😅
I'm thinking, as suggested, a more broader session state management system where in the case of a disconnection for a persistent session in flight Publishes return an error and the message id that was in flight. the mids are recorded in session management with a callback for priorSessionPublishCompleted or similar. This would only trigger for orphaned Publish calls not any Publish call (because this is supposed to be an edge case of normal use).
We also need to handle the other case of orphaned Publish (timeout without disconnect), which we would need to be dealt with regardless of whether persistence is being used. One thought I had was to have a buffered channel on the client that the user can read from to get the results of orphaned operations (Publishes that got retried after reconnecting or timed out Publishes/Subscribes/Unsubscribes). It could be enabled an option, so if the user does not care about seeing the results, we aren't holding onto the results in a channel unnecessarily.
Hi @alsm and @MattBrittan, Currently, I'm planning the following, and I'm hoping to get your help and input:
Once those are in place, I think that would be a good time to make another Beta release.
Additional questions:
Thank you so much!
I saw the PR pop up but haven't had time to give it a proper look, I'm away for a long weekend now so will do so on Tuesday. I'm happy to have a call to discuss the project and direction.
Hi, just following up to see if the outstanding PRs can be reviewed. Also, regarding setting up a call, shall we discuss it over e-mail? My e-mail is vishnureddy at microsoft dot com
Thanks!
There's been significant movement in the project since this issue was opened, so this issue is out-of-date. Going to close it.
Continuing the dialogue from #130.
Hi folks, I wanted to open this issue to discuss some thoughts I had on how to move the project forward. Ostensibly, the biggest missing feature is persisting sessions across network connections, which is required to properly support CleanStart=false with QoS > 0 PUBLISHes. However, I see some issues that I think need to be addressed before adding persistent session support. I feel that if these issues get addressed, it will make adding persistence support easier.
Before I go into discussing these issues, I want to make it clear that this is in no way intended to disparage @alsm's excellent work on this so far. I really appreciate that effort, as well as @MattBrittan's efforts on this project.
Session state is inherently coupled with packet identifier allocation and tracking
Currently, the packet identifiers are assigned and tracked using the implementation in message_ids.go. In order to support persistence, we either need to:
MIDService
being used.MIDService
to mark arbitrary packet identifiers as non-free when connecting with an existing sessionIf we go with option 1, I think it would make sense to combine the functionality of
Persistence
andMIDService
into a broader session state interface.It is difficult to stay spec-compliant with packet timeouts, especially once persistent sessions come into the picture
Currently, the client has a timeout for PUBLISH QoS > 0, SUBSCRIBE, and UNSUBSCRIBE operations in case the expected response packet doesn't come back from the server in a certain amount of time. There is a discussion about it in #23. This is not spec-compliant and will cause issues that will become more acute with persistent sessions in the picture (timeout on CONNECT, however, is fine). Instead, it is the job of the keep-alive mechanism (PINGREQ/PINGRESP) to detect when things have gone wrong with the connection. It looks like the keep-alive mechanism is not wired up to the client and instead just logs an error when a PINGRESP doesn't come back in time. Here's how I believe this should be handled:
I think this would require changing the existing
context.WithTimeout()
s tocontext.WithCancel()
s and keeping all thecancel
functions in a slice so they all can be called on disconnection. However, I'm somewhat of a Go novice so this might not be exactly the right approach. Hopefully the idea is clear, though.