lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
996 stars 364 forks source link

(hard to track) mqtt sometimes hangs resubscribing with clean session = false #832

Open kervel opened 2 years ago

kervel commented 2 years ago

What version of the Stream Reactor are you reporting this issue for?

3.0.1

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

running on v2.7 (i adapted build.gradle to match my kafka version)

What is the expected behaviour?

What was observed?

I found various similar (but not exactly the same) bug reports logged against paho mqttv3. So i modified lenses to use mqttv5 instead, and i cannot reproduce the issue anymore.

The modifications are rather trivial apart from clean session now being called clean start, but i can share them if needed.

What is your connector properties configuration (my-connector.properties)?

    name=mqtt-source
    connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
    tasks.max=1
    connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM +/event WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.BytesConverter`
    connect.mqtt.client.id=dm_source_id
    connect.mqtt.hosts=ssl://localhost:1883
    connect.mqtt.ssl.ca.cert=/opt/cert/ca.crt
    connect.mqtt.ssl.cert=/opt/cert/tls.crt
    connect.mqtt.ssl.key=/opt/cert/tls.key
    connect.mqtt.clean=false
    connect.mqtt.timeout=1000
    connect.mqtt.service.quality=2
    connect.mqtt.log.message=true
    producer.delivery.timeout.ms=20000
    producer.max.block.ms=60000
    producer.request.timeout.ms=15000

mosquitto config

    listener 1883 0.0.0.0
    allow_anonymous true
    require_certificate true
    use_identity_as_username true
    use_username_as_clientid true
    cafile /mosquitto/config/ca.crt
    certfile /mosquitto/config/server.crt
    keyfile /mosquitto/config/server.pem
    dhparamfile /mosquitto/config/dhparams.pem
    persistence true
    persistence_file /mosquitto/data/mosquitto.db
    persistent_client_expiration 2d
    queue_qos0_messages true
    max_queued_messages 10000
    log_type all
    acl_file /mosquitto/config/mosquitto.acl

Please provide full log files (redact and sensitive information)

this is on the first start:

[2022-02-09 20:38:30,720] INFO [Consumer clientId=connector-consumer-mqtt-sink-0, groupId=connect-mqtt-sink] Subscribed to topic(s): mqtt-outgoing (org.apache.kafka.clients.consumer.KafkaConsumer:965)
[2022-02-09 20:38:31,630] INFO Connected to ssl://localhost:1883 as dm_source_id (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager:50)
[2022-02-09 20:38:31,630] INFO Connected to ssl://localhost:1883 as dm_sink_id (com.datamountaineer.streamreactor.connect.mqtt.connection.MqttClientConnectionFn$:59)
[2022-02-09 20:38:31,636] WARN okay starting subscriptions (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager:153)
[2022-02-09 20:38:31,639] INFO Subscribed to topic [+/event] with QoS [2] (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager:160)

this is after a restart:

[2022-02-09 20:39:45,731] INFO [Consumer clientId=connector-consumer-mqtt-sink-0, groupId=connect-mqtt-sink] Subscribed to topic(s): mqtt-outgoing (org.apache.kafka.clients.consumer.KafkaConsumer:965)
[2022-02-09 20:39:46,619] INFO Connected to ssl://localhost:1883 as dm_sink_id (com.datamountaineer.streamreactor.connect.mqtt.connection.MqttClientConnectionFn$:59)
[2022-02-09 20:39:46,619] INFO Connected to ssl://localhost:1883 as dm_source_id (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager:50)
[2022-02-09 20:39:46,624] WARN okay starting subscriptions (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager:153)

here, the last log message (subscribed) is missing because it never reaches that line of code. note that i added "okay starting subscriptions" print statement at the beginning of the connectionComplete function.

kervel commented 2 years ago

Hello, unfortunately, in the mean time, i also managed to reproduce the problem using mqttv5. Seems stuck here:

│ "MQTT Call: dm_source_id" #45 prio=5 os_prio=0 cpu=6.39ms elapsed=151.10s tid=0x00007faa94017000 nid=0x42 in Object.wait()  [0x00007faabdd96000]                                                                                                                                                                                  │
│    java.lang.Thread.State: WAITING (on object monitor)                                                                                                                                                                                                                                                                            │
│     at java.lang.Object.wait(java.base@11.0.11/Native Method)                                                                                                                                                                                                                                                                     │
│     - waiting on <0x00000000b7a6de10> (a java.lang.Object)                                                                                                                                                                                                                                                                        │
│     at java.lang.Object.wait(java.base@11.0.11/Object.java:328)                                                                                                                                                                                                                                                                   │
│     at org.eclipse.paho.mqttv5.client.internal.Token.waitForResponse(Token.java:177)                                                                                                                                                                                                                                              │
│     - waiting to re-lock in wait() <0x00000000b7a6de10> (a java.lang.Object)                                                                                                                                                                                                                                                      │
│     at org.eclipse.paho.mqttv5.client.internal.Token.waitForCompletion(Token.java:130)                                                                                                                                                                                                                                            │
│     at org.eclipse.paho.mqttv5.client.MqttToken.waitForCompletion(MqttToken.java:76)                                                                                                                                                                                                                                              │
│     at org.eclipse.paho.mqttv5.client.MqttClient.subscribe(MqttClient.java:530)                                                                                                                                                                                                                                                   │
│     at org.eclipse.paho.mqttv5.client.MqttClient.subscribe(MqttClient.java:510)                                                                                                                                                                                                                                                   │
│     at org.eclipse.paho.mqttv5.client.MqttClient.subscribe(MqttClient.java:503)                                                                                                                                                                                                                                                   │
│     at com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager.connectComplete(MqttManager.scala:163)                                                                                                                                                                                                                   │
│     at org.eclipse.paho.mqttv5.client.internal.ConnectActionListener.onSuccess(ConnectActionListener.java:175)                                                                                                                                                                                                                    │
│     at org.eclipse.paho.mqttv5.client.internal.CommsCallback.fireActionEvent(CommsCallback.java:358)                                                                                                                                                                                                                              │
│     at org.eclipse.paho.mqttv5.client.internal.CommsCallback.handleActionComplete(CommsCallback.java:285)                                                                                                                                                                                                                         │
│     - locked <0x00000000b784b4a8> (a org.eclipse.paho.mqttv5.client.MqttToken)     
kervel commented 2 years ago

And ... i think i found something that works reliably. I now do:

  client.setTimeToWait(1000)

before connecting the client. This way, the .subscribe() call never blocks more than 1 sec. since the subscribe works (it always works, its just paho that doesn't see it) even if it times out it continues fine.

harish-mohanadas commented 8 months ago

The issue seems to still exist in version 6.0.0.

client.setTimeToWait will throw an MqttException on time out.

Could we check if there is a reconnect with clean session = false, and not subscribe in this case? client.subscribe will timeout and throw on the first connection, but proceed on reconnection.

--- a/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala
+++ b/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala
@@ -50,6 +50,7 @@ class MqttManager(
   client.setCallback(this)

   logger.info(s"Connecting to ${settings.connection}")
+  client.setTimeToWait(5000)
   client.connect(options)
   logger.info(s"Connected to ${settings.connection} as ${settings.clientId}")

@@ -165,9 +166,10 @@ class MqttManager(
     val topic = sourceToTopicMap.keySet.toArray
     val qos   = Array.fill(sourceToTopicMap.keySet.size)(settings.mqttQualityOfService)

-    if (reconnect)
+    if (reconnect && !options.isCleanSession())
       logger.warn(s"Reconnected. Resubscribing to topic $topic...")
-    client.subscribe(topic, qos)
+    else client.subscribe(topic, qos)
+
     if (reconnect)
       logger.warn(s"Resubscribed to topic $topic with QoS $qos")
     else logger.info(s"Subscribed to topic $topic with QoS $qos")

Is there a cleaner method?

harish-mohanadas commented 8 months ago

I could solve the issue by wrapping client.subscribe and the logging that follows in a Future to avoid blocking the connectComplete callback.

--- a/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala
+++ b/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala
@@ -28,6 +28,9 @@ import java.util
 import java.util.Base64
 import java.util.concurrent.LinkedBlockingQueue
 import java.util.concurrent.TimeUnit
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
 import scala.jdk.CollectionConverters.ListHasAsScala

 class MqttManager(
@@ -167,9 +170,14 @@ class MqttManager(

     if (reconnect)
       logger.warn(s"Reconnected. Resubscribing to topic $topic...")
-    client.subscribe(topic, qos)
-    if (reconnect)
-      logger.warn(s"Resubscribed to topic $topic with QoS $qos")
-    else logger.info(s"Subscribed to topic $topic with QoS $qos")
+
+    Future {
+      client.subscribe(topic, qos)
+      if (reconnect)
+        logger.warn(s"Resubscribed to topic $topic with QoS $qos")
+      else logger.info(s"Subscribed to topic $topic with QoS $qos")
+    }
+
+    return
   }
 }
chriline commented 4 months ago

I believe we're running into this very issue: With connect.mqtt.clean=false the connector occasionally stops working and does not recover. While Resubscribing is logged, Resubscribed is not. According to broker logs messages are being sent.