eclipse / paho.golang

Go libraries
Other
327 stars 92 forks source link

Publish fails after AwaitConnection #247

Open bronger opened 4 months ago

bronger commented 4 months ago

I use the following code snipped to publish an MQTT message:

if err := cm.AwaitConnection(ctx); err != nil {
        return err
}
pr, err := cm.Publish(ctx, &paho.Publish{
        QoS:     QoS,
        Retain:  Retain,
        Topic:   topic,
        Payload: msg,
})

Then, I observe sometimes that err is “writev tcp 10.xxx.xxx.xxx:39508->192.168.xxx.xxx:1883: use of closed network connection”.

I’m confused, and I am not sure whether this is a bug. FWIW, my expectation was that after AwaitConnection, the connection is usable. Of course, it can be simply the MQTT broker having an outage in the wrong moment, but two of my jobs died simultaneously with this error message, so I don’t think its due to bad timing.

I use “github.com/eclipse/paho.golang v0.20.0”.

MattBrittan commented 4 months ago

Please try 0.21 (probably will not help but slight chance the pinger bug, that was fixed, is the issue).

Otherwise, unfortunately, im going to need quite a bit more info (ideally code that replicates the issue, debug logs and broker logs). I understand this can be time consuming to pull together but its difficult to fix an issue I cannot replicate.

bronger commented 4 months ago

Okay, I’ll try.

But at the moment, the info that it should work is already very helpful to me, in the sense that my code snippet above does not lack e.g. a step between AwaitConnection and Publish.

MattBrittan commented 4 months ago

Your code is basically the same as this example (it runs Publish in a go routine but the timing should be a bit the same). I test this by starting things up in docker containers and then interrupting the network connection and have not experienced the issue you are reporting (note that this error is to be expected when the connection drops).

So your code looks fine. If you are able to provide a reproducible example then I'll have a look (but please note that these issues can sometimes be difficult to replicate and may be broker dependent).

jhillyerd commented 4 months ago

I ran into something similar today. It ended up being caused by a framework canceling the Context I provided to NewConnection() -- worth checking for anyone else seeing similar behavior.

bronger commented 3 months ago

As far as I’m concerned, this can be closed as I cannot reproduce it any more. I do see the error message if I have two programs running at the same time and using – erroneously – the same client ID. So maybe this was also the root cause of my original report.

MattBrittan commented 3 months ago

I do see the error message if I have two programs running at the same time and using – erroneously – the same client ID.

Are you able to provide these programs? Even if this only happens very occasionally, and in specific circumstances, I'd like to fix it (sounds like there may be a race condition). The issue with this kind of issue is that if you cannot duplicate them then they are hard to trace/fix.

bronger commented 3 months ago

I put a mini project for reproducing the issue at https://gitlab.com/bronger/mqtt-test.

MattBrittan commented 3 months ago

Thanks very much for putting this demonstration together.

In this case I believe the library is acting correctly; what appears to be happening is:

  1. Connection up (so cm.AwaitConnection returns)
  2. Connection dropped due to new connection from other client (with the same client id)
  3. cm.Publish attempts to send the PUBLISH packet; network connection is closed so this fails (loss of connection had not been detected previously).

The simplest way to avoid this would be to use PublishViaQueue which manages this process for you (and should correctly handle things like the MQTT session management which get a bit complex).

This is tangentially related to #249 (my comments on this PR anyway) in that the client attempts to reconnect immediately when dropped (so when using duplicate ID's the connect/drop/reconnect loop is very quick).

bronger commented 3 months ago

I see the error also with PublishViaQueue.

Probably one should consider AwaitConnection as a means of having necessary preconditons fulfilled for a successful publish. However, it is not sufficient. Thus, one should e.g. retry a publish a couple of times (before giving up)?

MattBrittan commented 3 months ago

I see the error also with PublishViaQueue.

OK, I'll test this later in the week (that function should succeed regardless of the connection state; the message should go into the queue and be sent when possible). Currently there is no way to tell when the message has actually been sent (that's on the to-do list).

Probably one should consider AwaitConnection as a means of having necessary preconditons fulfilled for a successful publish. However, it is not sufficient. Thus, one should e.g. retry a publish a couple of times (before giving up)?

Well AwaitConnection does what it says - waits for the connection to come up. I don't think there is really anything we can do (other than returning an error as happens now) to cater for the situation where the connection drops between that happening and the next action (e.g. a Publish). The intention of PublishViaQueue is to provide an easy solution for this (where the library manages the process of sending the message as soon as possible but in order).

MattBrittan commented 3 months ago

I have tested this with PublishViaQueue i.e.

func publishMQTT(ctx context.Context, c *autopaho.ConnectionManager, topic string, data any) error {
    msg, err := json.Marshal(data)
    if err != nil {
        panic(err)
    }
    if err := c.AwaitConnection(ctx); err != nil {
        // err can only be the cancelled context
        return err
    }
    err = c.PublishViaQueue(ctx, &autopaho.QueuePublish{
        Publish: &paho.Publish{
            Topic:   topic,
            Payload: msg,
        },
    })
    if err != nil {
        return fmt.Errorf("Error publishing to MQTT topic “%s”: %w", topic, err)
    } else {
        slog.Info("Queued MQTT message successfully", "message", string(msg), "topic", topic)
        return nil
    }
}

This appeared to work successfully (no errors reported) - however note that the AwaitConnection is not actually needed here (because the message will be queued regardless). I added a subscriber and it received a few messages (unsurprising it did not get more as the connection drops almost instantly).

So apologies but I've been unable to duplicate the second part of this (using PublishViaQueue) and the first bit seems as expected (connection can drop between c.AwaitConnection(ctx) returning and calling Publish).

bronger commented 3 months ago

Okay, thank you for your investigations and explanations!

With PublishViaQueue, for me it takes longer to get the error message but they come nevertheless. Anyway, I just wrap publishing in a retry cycle. I think this is absolutely fine. After all, one does the same thing with network connections.

MattBrittan commented 3 months ago

With PublishViaQueue, for me it takes longer to get the error message but they come nevertheless.

Does your code look similar to mine? I'm struggling to see how you could get the same error because PublishViaQueue just adds the message to the queue and then returns (so returning use of closed network connection does not seem possible). OnClientError may still receive these errors but with autopaho you can ignore this (as it will reconnect automatically and resend the message).

bronger commented 3 months ago

Now revisiting it, I realise that the error is different. It is a WARN and not a panic any more. So everything is fine.