lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
1.01k stars 366 forks source link

[Bug]: MalformedInputException in MQTT Source connector in Azure K8s #939

Open mukkchir opened 1 year ago

mukkchir commented 1 year ago

Hi,

I have deployed Strimzi kafka clusters in azure k8s. After deploying the MQTT source connector, I get the following error upon kubectl describe kctr -n kafka -

Error Trace

java.nio.charset.MalformedInputException: Input length = 1
              at java.base/java.nio.charset.CoderResult.throwException(CoderResult.java:274)
              at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
              at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:188)
              at java.base/java.io.InputStreamReader.read(InputStreamReader.java:177)
              at java.base/java.io.BufferedReader.fill(BufferedReader.java:162)
              at java.base/java.io.BufferedReader.read(BufferedReader.java:183)
              at scala.io.BufferedSource.$anonfun$iter$2(BufferedSource.scala:41)
              at scala.io.Codec.wrap(Codec.scala:74)
              at scala.io.BufferedSource.$anonfun$iter$1(BufferedSource.scala:41)
              at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
              at scala.collection.Iterator$$anon$27.next(Iterator.scala:1135)
              at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:637)
              at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
              at scala.io.Source.hasNext(Source.scala:253)
              at scala.collection.Iterator.isEmpty(Iterator.scala:466)
              at scala.collection.Iterator.isEmpty$(Iterator.scala:466)
              at scala.io.Source.isEmpty(Source.scala:205)
              at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1165)
              at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1164)
              at scala.io.Source.mkString(Source.scala:205)
              at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1179)
              at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1190)
              at scala.io.Source.mkString(Source.scala:205)
              at com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceTask.start(MqttSourceTask.scala:48)
              at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
              at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
              at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
              at java.base/java.lang.Thread.run(Thread.java:833)

MQTT source connector config

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mqtt-hm-source-connector
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
  tasksMax: 1
  config:
    topics: test
    connect.mqtt.clean: false
    connect.mqtt.kcql: INSERT INTO test SELECT * FROM `MY_TOPIC` WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
    connect.mqtt.converter.throw.on.error: true
    connect.mqtt.client.id: "CLIENT_ID"
    connect.mqtt.hosts: "ssl://mqtt.broker.com:8883" (THIS IS A THIRD PARTY CONNECTION)
    connect.mqtt.ssl.ca.cert: "/mycerts/ca_file.pem"
    connect.mqtt.ssl.cert: "/mycerts/crt_file.pem.crt"
    connect.mqtt.ssl.key: "/mycerts/key_file.pem.key"
    connect.mqtt.service.quality: 1
    connect.progress.enabled: true
    connect.mqtt.log.message: true
    errors.log.include.messages: true
    errors.log.enable: true
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false 

Strimzi kafka config

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: messaging
spec:
  kafka:
    version: 3.4.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      - name: external
        port: 9094
        type: nodeport
        tls: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
      inter.broker.protocol.version: "3.4"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

Steps to reproduce in azure k8s

  1. kubectl create namespace kafka
  2. kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
  3. kubectl create -f ABOVE_KAFKA_CONFIG.yaml -n kafka
  4. kubectl create -f ABOVE_MQTT_SOURCE_CONFIG.yaml -n kafka
  5. kubectl get kctr -n kafka (connector status appears not ready)
  6. kubectl describe kctr -n kafka (here the error trace can be seen)
  7. kubectl delete kctr MQTT_CONNECTOR_NAME -n kafka (to re-attach the connector with changes its better to delete the existing connector and repeat step-6)

Previously used versions

kafka - 3.2.0
Strimzi cluster operator - 0.29.0
stream reactor kafka connector version- 4.0.0

Currently used versions

Kafka - 3.4.0
Strimzi operator- 0.34.0
stream reactor kafka connector version- 4.2.0

In both of these versions, the same error above persisted. I think I'm missing some config but not sure what it is. Any guidance will be greatly helpful.

davidsloan commented 1 year ago

@mukkchir thanks for the detailed report. Is it possible that your default charset is set to something different to UTF-8?

Please could you try this build that I have prepared, with a possible fix.

Let me know how it goes.

mukkchir commented 1 year ago

@davidsloan you are really a lifesaver and huge kudos to you for giving me a quick fix. It's working! I'm now able to connect to the stream successfully. I would like to keep this ticket open for a few more days as I'm doing some tests on kafka and the connector you provided. I will report back to this thread in case of any errors/exceptions.

mukkchir commented 1 year ago

@davidsloan The connector is constantly looping over "error handling message on topic" and re-subscribing to topic with error description below:


    2023-05-15 08:24:04,304 DEBUG [mqtt-hm-source-connector|task-0] Message received on topic [TOPIC_NAME]. Message id =[3] , isDuplicate=false, payload=ENCRYPTED_DATA (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager) [MQTT Call: afaf2189-2fc7-4288-8ed4-0c90846c5ef3]
2023-05-15 08:24:04,304 ERROR [mqtt-hm-source-connector|task-0] Error handling message with id:3 on topic:TOPIC_NAME (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager) [MQTT Call: afaf2189-2fc7-4288-8ed4-0c90846c5ef3]
org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:249)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)
    at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
    at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$2(JsonSimpleConverter.scala:142)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:258)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:142)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$1(JsonSimpleConverter.scala:135)
    at scala.collection.immutable.List.map(List.scala:246)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:133)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$1(JsonSimpleConverter.scala:135)
    at scala.collection.immutable.List.map(List.scala:246)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:133)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:98)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter.convert(JsonSimpleConverter.scala:46)
    at com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager.messageArrived(MqttManager.scala:122)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:519)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:417)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:214)
    at java.base/java.lang.Thread.run(Thread.java:833)
2023-05-15 08:24:04,305 WARN [mqtt-hm-source-connector|task-0] Connection lost. Re-connecting is set to true (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager) [MQTT Call: afaf2189-2fc7-4288-8ed4-0c90846c5ef3]
MqttException (0) - org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:228)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:249)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)
    at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
    at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$2(JsonSimpleConverter.scala:142)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:258)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:142)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$1(JsonSimpleConverter.scala:135)
    at scala.collection.immutable.List.map(List.scala:246)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:133)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$1(JsonSimpleConverter.scala:135)
    at scala.collection.immutable.List.map(List.scala:246)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:133)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:98)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter.convert(JsonSimpleConverter.scala:46)
    at com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager.messageArrived(MqttManager.scala:122)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:519)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:417)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:214)
    ... 1 more

Not sure what is happening. How to properly handle this in the connector config? Also, what happens if the task.max is set to 2 or more?

davidsloan commented 1 year ago

Not sure what is happening. How to properly handle this in the connector config? Also, what happens if the task.max is set to 2 or more?

@mukkchir are you able to send me some samples of your data? This would help troubleshooting this issue. You can find me on our community slack if you are not comfortable sending it on here. It is not obvious to me what the problem is without more information.

mukkchir commented 1 year ago

@davidsloan Sure! Here is the sample data on which the error is occurring:

{
   "_id":{
      "$oid":"VARCHAR"
   },
   "capability":"diagnostics",
   "data":{
      "diagnostics":{
         "tire_pressures":[
            {
               "data":{
                  "location":"front_left",
                  "pressure":{
                     "unit":"kilopascals",
                     "value":{
                        "$numberDouble":"230.0"
                     }
                  }
               },
               "timestamp":"2022-11-17T12:33:22.000Z"
            },
            {
               "data":{
                  "location":"front_right",
                  "pressure":{
                     "unit":"kilopascals",
                     "value":{
                        "$numberDouble":"230.0"
                     }
                  }
               },
               "timestamp":"2022-11-17T12:33:19.000Z"
            },
            {
               "data":{
                  "location":"rear_left",
                  "pressure":{
                     "unit":"kilopascals",
                     "value":{
                        "$numberDouble":"257.5"
                     }
                  }
               },
               "timestamp":"2022-11-17T12:33:50.000Z"
            },
            {
               "data":{
                  "location":"rear_right",
                  "pressure":{
                     "unit":"kilopascals",
                     "value":{
                        "$numberDouble":"252.5"
                     }
                  }
               },
               "timestamp":"2022-11-17T12:33:10.000Z"
            }
         ]
      }
   },
   "message_id":"VARCHAR",
   "property":"tire_pressures",
   "version":{
      "$numberInt":"1"
   },
   "num":"VARCHAR"
}
davidsloan commented 1 year ago

I'm sorry I'm not seeing a problem with this input

I've put together a quick test here which you can see is working: https://github.com/lensesio/stream-reactor/pull/942

Is this message any different from previous messages? Is there any schema change, missing/added fields etc here since the preceding message?

mukkchir commented 1 year ago

@davidsloan This is just the sample data. Since the data is encrypted, I'm working on decrypting it. I will post the actual data soon. Is there a config where I can move to next message if the previous message is faulty?

spike83 commented 1 year ago

@davidsloan I've got the same issue with the jms source connector. When can we expect a release with the fix included? Thank you very much :pray:.

mukkchir commented 1 year ago

@davidsloan seems like its the same data which I posted here.

davidsloan commented 1 year ago

@mukkchir this is message id 3, correct? Are message id 1 & 2 any different?

davidsloan commented 1 year ago

@mukkchir it seems like the issue might be that a different charset encoding is being configured by default. Please can you try setting this environment variable in the Connect container:

LC_ALL=en_US.UTF-8

And let me know how you get on.

erezblm commented 1 year ago

Hey,

@davidsloan I was getting the exact same error with MalfromedInputException using strimzi and AKS deployment. Your uploaded package fix the issue for me so thanks a lot for that, waiting for an update for official release.

I tried the AC_ALL=en_US.UTF-8 but it was ignore by the bash at the beginning of the connect: "bash: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)"

Thanks.

erezblm commented 1 year ago

Hi @davidsloan, I'm now using the "sink connector" and I have the same problem with the malformed input. Is your fix handling only the source connector? Any idea if an official release will be soon with those fixes?

Thanks a lot, Erez.

sunmeplz commented 1 year ago

Im using mqtt connector inside debezium kafka connect docker, the issue was solved by adding LANG=en_US.UTF-8 variable to docker container