eclipse / paho.golang

Go libraries
Other
327 stars 92 forks source link

Debug log showing "use of closed network connection" on shutdown #244

Closed eest closed 6 months ago

eest commented 6 months ago

Describe the bug I am writing a tool that will be a subscriber and publisher on the same queue with the same client. When enabling debug messages I noticed I was getting errors on shutdown and I am not sure why.

To reproduce I have cut down on my code to try to get as small as an example as possible, here it is:

package main

import (
    "context"
    "fmt"
    "net/url"
    "os"
    "os/signal"
    "sync"
    "syscall"

    "github.com/eclipse/paho.golang/autopaho"
    "github.com/eclipse/paho.golang/paho"
    "github.com/rs/zerolog"
)

func messagePublisher(ctx context.Context, wg *sync.WaitGroup, payloadChan chan []byte, logger zerolog.Logger, cm *autopaho.ConnectionManager) {
    wg.Add(1)
    defer wg.Done()

    for payload := range payloadChan {
        logger.Info().Msgf("publishing: %s", string(payload))
        if _, err := cm.Publish(ctx, &paho.Publish{
            QoS:     1,
            Topic:   "test/topic",
            Payload: payload,
        }); err != nil {
            logger.Error().Err(err).Msg("failed publishing message")
        }
    }

    logger.Info().Msg("messagePublisher: exiting")
}

func messageSubscriber(wg *sync.WaitGroup, msgChan chan *paho.Publish, logger zerolog.Logger) {
    wg.Add(1)
    defer wg.Done()

    for msg := range msgChan {
        logger.Debug().Str("payload", string(msg.Payload)).Str("topic", msg.Topic).Msg("got message from queue")
    }

    logger.Info().Msg("messageSubscriber: exiting")
}

func setupMQTT(ctx context.Context, logger *zerolog.Logger) (*autopaho.ConnectionManager, chan *paho.Publish, error) {
    subChan := make(chan *paho.Publish)

    serverURL, err := url.Parse("mqtt://localhost:1883")
    if err != nil {
        return nil, nil, fmt.Errorf("unable to parse server string: %w", err)
    }

    errorLogger := logger.With().Str("paho_logger", "errors").Logger()
    pahoErrorLogger := logger.With().Str("paho_logger", "paho_errors").Logger()
    debugLogger := logger.With().Str("paho_logger", "debug").Logger()
    pahoDebugLogger := logger.With().Str("paho_logger", "paho_debug").Logger()

    cliCfg := autopaho.ClientConfig{
        ServerUrls:                    []*url.URL{serverURL},
        KeepAlive:                     20,    // Keepalive message should be sent every 20 seconds
        CleanStartOnInitialConnection: false, // Keep old messages in the broker in case we are not there
        SessionExpiryInterval:         60,    // If connection drops we want session to remain live whilst we reconnect
        OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
            logger.Info().Msg("pubsub: mqtt connection up")
            if _, err := cm.Subscribe(ctx, &paho.Subscribe{
                Subscriptions: []paho.SubscribeOptions{
                    {
                        Topic:   "test/topic",
                        QoS:     1,
                        NoLocal: false,
                    },
                },
            }); err != nil {
                logger.Error().Err(err).Msg("subscribe: failed to subscribe. Probably due to connection drop so will retry")
                return // likely connection has dropped
            }
            logger.Info().Msg("subscribe: mqtt subscription made")
        },
        OnConnectError: func(err error) { logger.Error().Err(err).Msg("pubsub: error whilst attempting connection") },
        Debug:          &debugLogger,
        Errors:         &errorLogger,
        PahoDebug:      &pahoDebugLogger,
        PahoErrors:     &pahoErrorLogger,
        ClientConfig: paho.ClientConfig{
            ClientID: "pahotest",
            OnPublishReceived: []func(paho.PublishReceived) (bool, error){
                func(pr paho.PublishReceived) (bool, error) {
                    subChan <- pr.Packet
                    return true, nil
                }},
            OnClientError: func(err error) { logger.Error().Err(err).Msg("pubsub: client error") },
            OnServerDisconnect: func(d *paho.Disconnect) {
                if d.Properties != nil {
                    logger.Info().Str("reason_string", d.Properties.ReasonString).Msg("pubsub: server requested disconnect")
                } else {
                    logger.Info().Uint8("reason_code", uint8(d.ReasonCode)).Msg("pubsub server requested disconnect")
                }
            },
        },
    }

    c, err := autopaho.NewConnection(ctx, cliCfg)
    if err != nil {
        return nil, nil, fmt.Errorf("setupMQTT: unable to create connection: %w", err)
    }

    return c, subChan, nil

}

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
    defer stop()

    logger := zerolog.New(os.Stdout).With().
        Timestamp().
        Str("service", "pahotest").
        Logger()

    mqttCM, subMsgChan, err := setupMQTT(ctx, &logger)
    if err != nil {
        logger.Error().Err(err).Msg("unable to setup MQTT publisher")
        os.Exit(1)
    }

    pubPayloadChan := make(chan []byte)
    var wg sync.WaitGroup

    // Comment out either of these and the errors at shutdown go away
    go messagePublisher(ctx, &wg, pubPayloadChan, logger, mqttCM)
    go messageSubscriber(&wg, subMsgChan, logger)

    // Uncomment to test sending/receiving.
    //if err = mqttCM.AwaitConnection(ctx); err != nil {
    //  panic(err)
    //} else {
    //  pubPayloadChan <- []byte("test payload")
    //}

    // Wait here for things to shutdown.
    <-mqttCM.Done()

    // Close message handler channels.
    close(subMsgChan)
    close(pubPayloadChan)

    // Wait for handlers to exit.
    wg.Wait()
}

Debug output Startup:

# ./pahotest
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:14:13Z","message":"connecting\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:14:13Z","message":"sending CONNECT\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:14:13Z","message":"waiting for CONNACK/AUTH\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:14:13Z","message":"received CONNACK\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:14:13Z","message":"received CONNACK, starting PingHandler\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:14:13Z","message":"starting publish packets loop\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:14:13Z","message":"starting incoming\n"}
{"level":"info","service":"pahotest","time":"2024-02-21T09:14:13Z","message":"pubsub: mqtt connection up"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:14:13Z","message":"subscribing to [{Topic:test/topic QoS:1 RetainHandling:0 NoLocal:false RetainAsPublished:false}]"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:14:13Z","message":"waiting for SUBACK\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:14:13Z","message":"received SUBACK\n"}
{"level":"info","service":"pahotest","time":"2024-02-21T09:14:13Z","message":"subscribe: mqtt subscription made"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-21T09:14:13Z","message":"queue AwaitConnection\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-21T09:14:13Z","message":"queue got connection\n"}

After hitting ctrl+c:

^C{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-21T09:15:00Z","message":"innerCtx Done\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"disconnecting &{<nil> 0}\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"client stop requested\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"client stopping, incoming stopping\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"returning from incoming worker\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"returning from publish packets loop worker\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-21T09:15:00Z","message":"queue done\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"returning from ping handler worker\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"conn closed\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"acks tracker reset\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"session updated, waiting on workers\n"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"workers done\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-21T09:15:00Z","message":"mainLoop: server connection handler exiting due to context: context canceled\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-21T09:15:00Z","message":"mainLoop: connection manager has terminated\n"}
{"level":"info","service":"pahotest","time":"2024-02-21T09:15:00Z","message":"messagePublisher: exiting"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-21T09:15:00Z","message":"error called: read tcp 127.0.0.1:59882->127.0.0.1:1883: use of closed network connection\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-21T09:15:00Z","message":"handleError received extra error: read tcp 127.0.0.1:59882->127.0.0.1:1883: use of closed network connection"}
{"level":"info","service":"pahotest","time":"2024-02-21T09:15:00Z","message":"messageSubscriber: exiting"}

Expected behaviour I noticed the two error messages mentioning use of closed network connection and have been trying to figure out why this is. I have noticed that if I comment either of the lines:

// Comment out either of these and the errors at shutdown go away
go messagePublisher(ctx, &wg, pubPayloadChan, logger, mqttCM)
go messageSubscriber(&wg, subMsgChan, logger)

... then the error goes away. As can be seen the code neither receives or publishes anything at this point.

Software used:

Additional context I was somewhat thinking this could be related to the reuse of ctx inside the messagePublisher() goroutine but even if i comment out the whole call to Publish() the error still remains.

MattBrittan commented 6 months ago
// Comment out either of these and the errors at shutdown go away
go messagePublisher(ctx, &wg, pubPayloadChan, logger, mqttCM)
go messageSubscriber(&wg, subMsgChan, logger)

I cannot replicate this bit (the error is output whether these lines are there or commented out). Note that if you comment out either of these but do not comment out your pubPayloadChan <- []byte("test payload") then your code will end up blocking somewhere (so behaviour will be quite different).

The cause of the message is that paho.Disconnet() closes the network connection. This leads to the incoming goroutine receiving an error whilst receiving on the connection; leading to the logging of the error you are seeing.

I note that the error does not reach your OnClientError function (this should will be prevented by autopaho.errorHandler).

As such, I believe what you are seeing is the library working as intended (the debug logs exist for debugging purposes and the message is useful in that context). However I may be missing something.

eest commented 6 months ago

Hello,

Interesting that you can not reproduce it... Just to verify I copy-pasted my own code above to main.go, on ctrl+c i got:

{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-22T13:23:23Z","message":"mainLoop: server connection handler exiting due to context: context canceled\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-22T13:23:23Z","message":"mainLoop: connection manager has terminated\n"}
{"level":"info","service":"pahotest","time":"2024-02-22T13:23:23Z","message":"messagePublisher: exiting"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-22T13:23:23Z","message":"error called: read tcp 127.0.0.1:52814->127.0.0.1:1883: use of closed network connection\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-22T13:23:23Z","message":"handleError received extra error: read tcp 127.0.0.1:52814->127.0.0.1:1883: use of closed network connection"}
{"level":"info","service":"pahotest","time":"2024-02-22T13:23:23Z","message":"messageSubscriber: exiting"}

... then changing the code to:

    // Comment out either of these and the errors at shutdown go away
    //go messagePublisher(ctx, &wg, pubPayloadChan, logger, mqttCM)
    go messageSubscriber(&wg, subMsgChan, logger)

... which leads to the following logs:

{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-22T13:25:31Z","message":"mainLoop: server connection handler exiting due to context: context canceled\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-22T13:25:31Z","message":"mainLoop: connection manager has terminated\n"}
{"level":"info","service":"pahotest","time":"2024-02-22T13:25:31Z","message":"messageSubscriber: exiting"}

... and going the other way:

    // Comment out either of these and the errors at shutdown go away
    go messagePublisher(ctx, &wg, pubPayloadChan, logger, mqttCM)
    //go messageSubscriber(&wg, subMsgChan, logger)

... leads to:

{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-22T13:28:54Z","message":"mainLoop: server connection handler exiting due to context: context canceled\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-22T13:28:54Z","message":"mainLoop: connection manager has terminated\n"}
{"level":"info","service":"pahotest","time":"2024-02-22T13:28:54Z","message":"messagePublisher: exiting"}

I know the code is not really functional without both of them, I was just trying to zero in on what might be throwing the errors as I wasnt sure how having both of them would really differ from having just one of each (since the code as-is does not really do anything). If they are benign informational errors then we can close this ticket as I just wanted to make sure I was not doing anything obviously broken or silly :).

eest commented 6 months ago

... I can add that I am building the code with go version go1.22.0 darwin/arm64 if that makes a difference.

MattBrittan commented 6 months ago

Please add a time.Sleep(time.Second) at the end of main. My guess is that the difference is one of timing; the wg.Wait() at the end of your code is waiting for the goroutines to exit; having an additional go routine running will increase the time this takes.

When the error is detected a goroutine is started to handle it (i.e. go c.error(err)) so there is a race between the goroutine running (and logging the message) and the program ending. This means the results are unpredictable (and it's unsurprising that our results differ). As long as the client itself is not receiving this error (it's only logged by the library) then it's operating as expected.

eest commented 6 months ago

Hah, nice catch! With...

    // Comment out either of these and the errors at shutdown go away
    //go messagePublisher(ctx, &wg, pubPayloadChan, logger, mqttCM)
    go messageSubscriber(&wg, subMsgChan, logger)

and...

    // Wait for handlers to exit.
    wg.Wait()

    time.Sleep(time.Second)

I now see:

{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-23T10:44:21Z","message":"mainLoop: server connection handler exiting due to context: context canceled\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-23T10:44:21Z","message":"mainLoop: connection manager has terminated\n"}
{"level":"info","service":"pahotest","time":"2024-02-23T10:44:21Z","message":"messageSubscriber: exiting"}
{"level":"debug","service":"pahotest","paho_logger":"paho_debug","time":"2024-02-23T10:44:21Z","message":"error called: read tcp 127.0.0.1:41998->127.0.0.1:1883: use of closed network connection\n"}
{"level":"debug","service":"pahotest","paho_logger":"debug","time":"2024-02-23T10:44:21Z","message":"handleError received extra error: read tcp 127.0.0.1:41998->127.0.0.1:1883: use of closed network connection"}

As you mentioned I am not getting any errors returned to the client so I'll close this, thanks for the help!