redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.13k stars 831 forks source link

MQTT input seemed not stable with high input load. #2158

Open kriswz opened 1 year ago

kriswz commented 1 year ago

Dear Support,

I'm caught by a connection interruption issue during testing on mqtt input to the benthos.

To start with the test, I was using a small pieces of code as a MQTT client and publish messages to MQTT server (running in docker) every 5 second, then I can see the message though to the benthos output, where it connected to a Kafka instance. No error from Benthos side, and all messages seemed to landed to the output side.

Then I tried to increase the pushing interval of the MQTT client to one message per 20 ms, then I can see from the output side that seemed messages flow was interrupted, and then checking the log from Benthos container, I can see two lines of error messages also indicated that the connection was interrupted:

level=error msg="Connection lost for unknown reasons." @service=benthos label="" path=root.input.broker.inputs.1 level=error msg="Connection lost due to: pingresp not received, disconnecting" @service=benthos label="" path=root.input.broker.inputs.1

The connection seemed auto reestablished right after. I know this error was popped from the underline lib used by Benthos, just wondering if there are some thing we could do from Benthos configuration side pls?

thanks!

Environment: Docker image with latest tag pull on 11th Oct, 2023. MQTT server: EMQT MQTT Client: with two senarios:

  1. send a test message every 5 second, with 10 instances
  2. send test message every 200 ms with one instance.

Both case will trigger the error.

Benthos: Config file: ` input: broker: inputs:

Jeffail commented 1 year ago

Hey @kriswz, yeah this looks as though there's maybe some options we can set on the underlying client library, I'm guessing in your case we'd need to increase the time it waits for ping responses but I can't see at a glance which of the options would let us tune that.

kriswz commented 1 year ago

@Jeffail Thanks a lot for your fast response, looking at the code of pano.mqtt.golang lib, there is a client option:

// SetPingTimeout will set the amount of time (in seconds) that the client
// will wait after sending a PING request to the broker, before deciding
// that the connection has been lost. Default is 10 seconds.
func (o *ClientOptions) SetPingTimeout(k time.Duration) *ClientOptions {
    o.PingTimeout = k
    return o
}

This could be the one you were refer to I guess.

kriswz commented 1 year ago

I'll run some test from my side to check if increasing this time out would help on this situation. I could see the default time out was already 10 sec.

kriswz commented 1 year ago

ok, tried to set the ping time out, but no luck, I'm setting keepalive to 60, with 45 pint timeout, and every 60 second, the connection will be dropped, seemed the 45 second ping time was not really working.

conf := mqtt.NewClientOptions().
        SetAutoReconnect(false).
        SetClientID(m.conf.ClientID).
        SetCleanSession(m.conf.CleanSession).
        SetConnectTimeout(m.connectTimeout).
        SetKeepAlive(time.Duration(m.conf.KeepAlive) * time.Second).
        SetPingTimeout(45 * time.Second).