johanvandevenne / kafka-connect-mqtt

Kafka Connect MQTT Connector
54 stars 23 forks source link

Getting Error while starting connector #7

Open kaustav1996 opened 3 years ago

kaustav1996 commented 3 years ago

Hello All,

I am trying to use this connector plugin to connect to a 'mqtts' protocol broker.

I have built the required plugin directory and copied it to the connector plugins directory. Then I have passed all the details through a .properties file like this :

name=mqtt-source-connector
connector.class=be.jovacon.kafka.connect.MQTTSourceConnector
mqtt.topic=test
kafka.topic=test
mqtt.clientID=<client-id>
mqtt.broker=mqtts://<mqtt-broker-address>:8883
mqtt.username=<username>
mqtt.password=<password>
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false

I am getting this error :

[2020-12-08 09:52:10,884] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.NoClassDefFoundError: com/google/common/base/Preconditions
    at com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDequeBuilder.build(SourceRecordDequeBuilder.java:95)
    at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:32)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 10 more
[2020-12-08 09:52:10,924] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188)
[2020-12-08 09:52:10,924] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSourceTask:223)
java.lang.NullPointerException
    at be.jovacon.kafka.connect.MQTTSourceTask.stop(MQTTSourceTask.java:87)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.tryStop(WorkerSourceTask.java:220)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I think because of this error I am unable to connect to the mqtt broker. Is it possible that the plugin wasn't built correctly? I was able to see the plugin using

http://<kafkaconnect>:8083/connector-plugins
richstimson commented 3 years ago

First, there is a bug in the instructions: mqtt.username should be mqtt.userName otherwise it doesn't get recognised.

However, I get the same error as kaustav1996:

Steps: Kafka 2.13-2.7.0 installed using: https://kafka.apache.org/documentation/#quickstart

config/connect-distributed.properties is the default file, with the addition of plugin.path=/home/ubuntu/kafka_2.13-2.7.0/plugins which has a link to the mqtt plugin (mqtt -> /home/ubuntu/kafka-connect-mqtt/target/kafka-connect-mqtt-1.1.0-package/kafka-connect-mqtt).

I started the connector using: $ bin/connect-distributed.sh config/connect-distributed.properties

and then ran: curl -X POST \ http://:8083/connectors \ -H 'Content-Type: application/json' \ -d '{ "name": "mqtt-source-connector", "config": { "connector.class" : "be.jovacon.kafka.connect.MQTTSourceConnector", "mqtt.topic" : "rs-mqtt-test-app/devices/rs-virtual-device/up", "kafka.topic" : "quick-events2", "mqtt.clientID" : "my_client_id", "mqtt.broker" : "tcp://eu.thethings.network:1883", "key.converter" : "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable" : false, "value.converter" : "org.apache.kafka.connect.storage.StringConverter", "value.converter.schemas.enable" : false, "mqtt.userName" : "rs-mqtt-test-app", "mqtt.password" : "ttn-account-v2." } }'

ERROR reported: [2021-04-16 14:13:40,973] INFO MQTTSourceConnectorConfig values: kafka.topic = quick-events2 mqtt.automaticReconnect = true mqtt.broker = tcp://eu.thethings.network:1883 mqtt.cleanSession = true mqtt.clientID = my_client_id mqtt.connectionTimeout = 30 mqtt.keepAliveInterval = 60 mqtt.password = [hidden] mqtt.qos = 1 mqtt.topic = rs-mqtt-test-app/devices/rs-virtual-device/up mqtt.userName = rs-mqtt-test-app (be.jovacon.kafka.connect.config.MQTTSourceConnectorConfig:361) [2021-04-16 14:13:40,974] INFO [Worker clientId=connect-1, groupId=connect-cluster] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1577) [2021-04-16 14:13:40,980] INFO WorkerSourceTask{id=mqtt-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:478) [2021-04-16 14:13:40,980] INFO WorkerSourceTask{id=mqtt-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:495) [2021-04-16 14:13:40,981] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafk> java.lang.NoClassDefFoundError: com/google/common/base/Preconditions at com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDequeBuilder.build(SourceRecordDequeBuilder.java:95) at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:32) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 10 more [2021-04-16 14:13:40,982] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSourceTask:223) java.lang.NullPointerException at be.jovacon.kafka.connect.MQTTSourceTask.stop(MQTTSourceTask.java:87) at org.apache.kafka.connect.runtime.WorkerSourceTask.tryStop(WorkerSourceTask.java:220) at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:190) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

Any ideas how to resolve this, or can someone point to an open source Kafka-MQTT connector. All the pre-built ones from Confluent, Lenses, etc are not free as far as I can see..

Amine27 commented 2 years ago

Hello, If any one had this problem, make sure to add this dependency to the pom.xml file:

    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>30.0-jre</version>
    </dependency>

Rebuild and every thing should be fine.

SP-TS1 commented 1 year ago

Hello, If any one had this problem, make sure to add this dependency to the pom.xml file:

    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>30.0-jre</version>
    </dependency>

Rebuild and every thing should be fine.

you're my life saver, thanks!