eclipse / paho.golang

Go libraries
Other
330 stars 92 forks source link

Use network loops to read/write packets in isolated goroutines #151

Closed vishnureddy17 closed 10 months ago

vishnureddy17 commented 1 year ago

Here's a proposal to make the client.go read and write packets in isolated goroutines.

I think this makes things easier to work with and makes errors on the connection easier to deal with.

Thanks @BertKleewein for the inspiration!

MattBrittan commented 1 year ago

Thanks @vishnureddy17,

The new approach is somewhat similar to that used in the v3 Client (albeit this client waits for errors almost every time it sends to outgoingPackets rather than returning a token as the v3 client does).

Could you please add some detail as to the benefits that you believe this change delivers?

Moving the reads and writes off to separate goroutines does have some potential benefits but, in my opinion, comes at the cost of significantly increasing complexity. Just to be clear - I'm not saying that this is not the right approach; however at this point I'm not convinced the benefits outweigh the negatives.

Personally I find:

if _, err := ccp.WriteTo(c.Conn); err != nil {
        cleanup()
        return nil, err
    }

easier to read/reason about than:

errChan := make(chan error)
c.outgoingPackets <- outgoing{
    packet: ccp,
    err:    errChan,
}
if err := <-errChan; err != nil {
    cleanup()
    return nil, err
}

Based on experience in the v3 client this approach makes it very easy to introduce races, deadlocks and leaking goroutines (I have put a lot of time into finding/resolving these in the v3 client). For example consider what happens (with your current implementation) when:

  1. A packets.PUBREC is received and is being processed in incoming()
  2. Concurrently the connection drops and readLoop() calls c.error(err) and returns.
  3. c.error closes c.stop
  4. writeLoop terminates due to c.stop
  5. incoming() (back to processing the packets.PUBREC from step 1) is now blocked at c.outgoingPackets <- outgoing{packet: pl,err: plErrChan}
  6. error(e error) will remain blocked on c.workers.Wait() (as incoming never exits c.workers.Done() is not called)

Note: this is theoretical I have not put a huge amount of time into confirming this can happen but it looks like a realistic scenario).

Edit Here is another one (much more likely to be an issue).

  1. Connected with Manual ACKS so tracker goroutine is running.
  2. Some ACKS are queued (due to out of order acknowledgments)
  3. Error occurs resulting in c.stop closing
  4. We now have a race between the checks for c.stop in the acks tracker and in writeLoop() (if writeLoop() wins then the acknowledgments will be blocked due to the send).

These examples may seem convoluted but I hit plenty of similar problems in the v3 client that caused production issues (and these are very hard to track down, particularly when they are user reported and you cannot duplicate them!).

The above leak is not really an issue if the program exists at that point; however, when used with something like autopaho, I'd expect the application to continue running for years (I have v3 clients with up times well over a year running on routers with minimal ram) so leaking goroutines become an issue (and debugging this is time consuming!). Edit actually this is an issue because c.workers.Wait() will block so we are likely to end up with a deadlock.

Anyway I'm keen to discuss - I will add a few other comments to your PR (thanks for the submission by the way - its great to see activity on this package picking up again!).

vishnureddy17 commented 1 year ago

Thanks for taking a careful look at this. This is all excellent feedback!

I think a part of the difficulty here is that I'm trying to keep this pull request small and not change everything at once. However, this is one step in a sequence of changes I'm thinking of. My goal here was to make the fewest changes possible to have reads and writes in their own goroutines. Clearly, there are still issues. How about I continue working on my fork to see where I can take this? The end result will be a big change, but I'll do my best to make each commit understandable in isolation. I could submit a PR later with a more complete picture, and we may need to move the discussion off of GitHub once we get to that point. It's fine if it doesn't work out in the end, but I'd like to at least prove these ideas to myself.

Personally I find: if _, err := ccp.WriteTo(c.Conn); err != nil { cleanup() return nil, err } easier to read/reason about than: errChan := make(chan error) c.outgoingPackets <- outgoing{ packet: ccp, err: errChan, } if err := <-errChan; err != nil { cleanup() return nil, err }

I agree. I don't think error handling should actually be done in those goroutines. I'd like to see errors being dealt with more consistently in the client, and I see error handling being done this way:

  1. Worker goroutine (in this case, writeLoop()) encounters an error
  2. Worker goroutine invokes the client error handling mechanism (currently Client.error())
  3. Rest of the workers get cleaned up.

I don't think errors should be passed between worker goroutines. They should just be passed up to the client which takes care of any necessary cleanup. I did not make that change because I was trying to make the minimal changes needed to communicate network loop idea.

Could you please add some detail as to the benefits that you believe this change delivers?

The two main benefits I see are:

  1. It makes error handling with the connection easier to deal with. Instead of having to handle errors everywhere there is an issue with the connection, the error would only happen in one place. Note that I do not take advantage of that in this PR, because I was trying to keep it reasonably small.
  2. MQTT has some pretty stringent packet ordering requirements, and using channels for packet reads/writes could make handling these ordering requirements easier to reason about, especially once retry delivery stuff comes into the picture.
MattBrittan commented 1 year ago

However, this is one step in a sequence of changes I'm thinking of.... How about I continue working on my fork to see where I can take this?

It may be worth raising an issue to discuss your longer term plan? I've used similar approaches successfully but have also been burnt due to this approach making it really easy to introduce deadlocks etc.

So a fork may be the way to go (but worth collaborating to ensure its a worthwhile direction to take). Note that I am currently working on session persistence off-line (may make a test release public on my repo, but will not push to this repo until it's fully working and tested).

The two main benefits I see are: It makes error handling with the connection easier to deal with. Instead of having to handle errors everywhere there is an issue with the connection, the error would only happen in one place. Note that I do not take advantage of that in this PR, because I was trying to keep it reasonably small. MQTT has some pretty stringent packet ordering requirements, and using channels for packet reads/writes could make handling these ordering requirements easier to reason about, especially once retry delivery stuff comes into the picture.

I can see some benefits re point 1. However most of the time when we are sending data we do need to be aware of any issues at that point in the code so appropriate action can be taken (e.g. returning an error to the caller). I believe this reduces the benefit of your approach (and a simpler technique might be a custom io.Writer that triggers the shutdown if an error is returned).

Ref point 2 - the ordering requirements are mainly around PUBLISH and the relevant ACKS (MQTT-4.6.0-2 to MQTT-4.6.0-4). I believe the requirement exists (I asked the spec authors about this :-) ) to enable transactional logging of messages (e.g. run INSERT on PUBLISH and COMMIT on PUBREL). So my personal opinion is that, while it would be nice to follow the spec, it's not worth adding heaps of complexity to do so (I have not seen a broker that will error on out of order ACK's and paho.Client does not currently provide a mechanism for the user to access a message before it's fully acknowledged).

vishnureddy17 commented 1 year ago

Another thing: Right now, there seems to be cases where packets reads and writes can block each other. It might be worth trying to avoid that.

It may be worth raising an issue to discuss your longer term plan? I've used similar approaches successfully but have also been burnt due to this approach making it really easy to introduce deadlocks etc.

I'm going to work on it offline to see where I can take it before I start discussing it on GitHub, the ideas I have are a bit nebulous right now, and I think getting something concrete will clarify things. Your feedback has been super helpful. And I see what you mean about how this approach can make it easy to introduce deadlocks, races, and goroutine leaks.

So a fork may be the way to go (but worth collaborating to ensure its a worthwhile direction to take). Note that I am currently working on session persistence off-line (may make a test release public on my repo, but will not push to this repo until it's fully working and tested).

This is great! If you decide to make a release on your repo, I'd love to check it out. Looking forward to seeing what comes out of this :)

I have not seen a broker that will error on out of order ACK's

In my testing, Azure Event Grid Namespaces (which is currently in public preview) disconnects clients that send out-of-order ACKs.

vishnureddy17 commented 1 year ago

I've been working on this, and I have something working, including in-memory persistence. However, it only supports QoS 1 for the time-being. I don't think it's quite ready to bring into this repo, but I thought I'd post it here in case anyone is curious about the direction I'm thinking of.

https://github.com/vishnureddy17/paho.golang/tree/persistence-network-loops

MattBrittan commented 1 year ago

Thanks @vishnureddy17 - I also hope to have a solution (quite a different direction that caters for QOS2) ready in a few days so it will be interesting to compare approaches (I've tried a lot of approaches but feel I have something workable). Will be keeping things outside the paho.golang repo for now because whichever option we go for is going to be a big change (need to work out how to handle review/integration). Its very hard to do this without changes that will break some users code and it's likely that bugs will be introduced...

vishnureddy17 commented 1 year ago

quite a different direction that caters for QOS2

To be clear, what I've done does not preclude QoS 2, I just chose to omit it for now.

whichever option we go for is going to be a big change (need to work out how to handle review/integration). Its very hard to do this without changes that will break some users code and it's likely that bugs will be introduced...

Yes, definitely. Good thing this is still in beta.

Looking forward to seeing what you have!

MattBrittan commented 1 year ago

My attempt is in this branch - I believe it's almost ready; it passes the tests I've put in place so far and seems to run OK with the docker test (to which I've added a disconnect so I can check for lost messages). It does need a further review and clean-up but I thought it was better to make it public so we can discuss the different approaches and decide the best way forward. Note that the readme contains a rationale for the decisions made (and info on todo's, breaking changes etc).

You will note that this is a major change - it decouples the state (including Mids and store) from paho. This means that you can call Publish and the call can block even if the connection drops, and is re-established, in the interim. My aim (nearly there!) is for autopaho to be fire and forget (just keep publishing and have confidence that autopaho will ensure the message is delivered regardless of any issues with the connection).

I'll add a comment to issue #25 with links for those watching that issue.

MattBrittan commented 11 months ago

Hi @vishnureddy17, What are your current thoughts on this PR? #172 introduced major changes which mean that this PR would need a refresh (note that I did implement a couple of things you included here such as adding ToControlPacket() to some packets). We could take some elements of this (and your work in the separate repo) and implement them within the new structure if you feel they will bring benefits?

Matt

vishnureddy17 commented 10 months ago

@MattBrittan, I'm going to go ahead and close this PR from now. I think the state of repo has diverged so far from this PR at this point that it's not worth keeping around.

I'm busy with other work right now, but in the future I might look to contribute some of the ideas in this PR separately.