johanvandevenne / kafka-connect-mqtt

Kafka Connect MQTT Connector
51 stars 21 forks source link

Getting error when MQTT source connector is created #2

Open ntsh999 opened 4 years ago

ntsh999 commented 4 years ago

Objective: to setup MQTT source connector with Confluent kafka v 5.4 Steps Performed:

  1. git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git
  2. cd kafka-connect-mqtt
  3. mvn clean install
  4. copied the jar from /kafka-connect-mqtt/target to the plugins directory. The path of this directory is mentioned in the worker configuration.
  5. executing http://:8083/connector-plugins gives the expected output.
  6. curl -d @./mqtt-source-connector-1.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Expected: There should be no error logs in kafka connect and connector should be available for usage. But after 6th step is performed following is the error received in kafka connect logs :

[2020-06-01 08:01:31,181] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1236)
java.lang.NoClassDefFoundError: be/jovacon/kafka/connect/MQTTSourceTask
        at be.jovacon.kafka.connect.MQTTSourceConnector.taskClass(MQTTSourceConnector.java:27)
        at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:322)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1287)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1225)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:125)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:1242)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:1239)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

following is connector config json file

{ "name": "mqtt-source-connector-1",
    "config":
    {
      "connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector",
      "mqtt.topic":"my_mqtt_topic",
      "kafka.topic":"my_kafka_topic",
      "mqtt.clientID":"my_client_id",
      "mqtt.broker":"tcp://broker-ip:1883",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable":false,
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter.schemas.enable":false,
      "mqtt.username":"username",
      "mqtt.password":"password"
    }
}

How can I fix the above issue?

ntsh999 commented 4 years ago

I fixed this by copying all the 6 jars in the directory /kafka-connect-mqtt/target/kafka-connect-mqtt-1.1.0-package/kafka-connect-mqtt. My path is different from the one mentioned in the readme file.

ntsh999 commented 4 years ago

I have performed all the step correctly, was able to successfully setup mqtt source and sink connector on one VM. But now when I am doing it on a new VM I am getting the above error message in the confluent logs. Surprisingly, the sink connector shows no error messages. Refer this link as well https://stackoverflow.com/questions/61716789/classnotfoundexception-with-kafka-connect-mqtt-connector/61716965?noredirect=1#comment110317608_61716965