johanvandevenne / kafka-connect-mqtt

Kafka Connect MQTT Connector
54 stars 23 forks source link

unable to connect MQTT source connector with confluent kafka v5.4 #3

Open ntsh999 opened 4 years ago

ntsh999 commented 4 years ago

Objective: to setup MQTT source connector with Confluent kafka v 5.4 Steps Performed:

git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git cd kafka-connect-mqtt mvn clean install copied the jar from /kafka-connect-mqtt/target to the plugins directory. The path of this directory is mentioned in the worker configuration. executing http://:8083/connector-plugins gives the expected output. curl -d @./mqtt-source-connector-1.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors Expected: There should be no error logs in kafka connect and connector should be available for usage. But after 6th step is performed following is the error received in kafka connect logs :

[2020-06-01 09:42:38,755] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Not authorized to connect (5)
        at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:53)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: Not authorized to connect (5)
        at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:28)
        at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:1040)
        at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:151)
        ... 1 more
[2020-06-01 09:42:38,756] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

The error shows connection failure between mqtt broker and kafka connect. Although, I have entered correct mqtt broker credentials. I verified this by running the mosquitto pub sub commands from the VM on which my confluent platform is running. Any idea why am I getting this error?

johanvandevenne commented 4 years ago

Do you get any valuable logging on the MQTT broker side ?

On Mon, Jun 1, 2020 at 11:37 AM ntsh999 notifications@github.com wrote:

Objective: to setup MQTT source connector with Confluent kafka v 5.4 Steps Performed:

git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git cd kafka-connect-mqtt mvn clean install copied the jar from /kafka-connect-mqtt/target to the plugins directory. The path of this directory is mentioned in the worker configuration. executing http://:8083/connector-plugins gives the expected output. curl -d @./mqtt-source-connector-1.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors Expected: There should be no error logs in kafka connect and connector should be available for usage. But after 6th step is performed following is the error received in kafka connect logs :

[2020-06-01 08:24:15,107] INFO [Producer clientId=connector-producer-johan-mqtt-source-connector-1-0] Cluster ID: 0ZK4cUcPSkmwpRsrIxZKxg (org.apache.kafka.clients.Metadata:259) [2020-06-01 08:24:15,416] INFO Connected to MQTT Broker (be.jovacon.kafka.connect.MQTTSourceConnector:38) [2020-06-01 08:24:15,416] INFO Subscribing to my_mqtt_topic with QOS 1 (be.jovacon.kafka.connect.MQTTSourceConnector:43) [2020-06-01 08:24:15,420] INFO WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2020-06-01 08:24:15,420] INFO WorkerSourceTask{id=johan-mqtt-source-connector-1-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2020-06-01 08:24:15,420] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: MqttException (128) at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:53) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: MqttException (128) at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:466) at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:454) at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:44) ... 8 more [2020-06-01 08:24:15,421] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180) [2020-06-01 08:24:15,425] INFO [Producer clientId=connector-producer-johan-mqtt-source-connector-1-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)

The error shows connection failure between mqtt broker and kafka connect. Although, I have entered correct mqtt broker credentials. I verified this by running the mosquitto pub sub commands from the VM on which my confluent platform is running. Any idea why am I getting this error?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/johanvandevenne/kafka-connect-mqtt/issues/3, or unsubscribe https://github.com/notifications/unsubscribe-auth/AODU2ZFVYARARNAEI5NPODTRUNZGHANCNFSM4NPVHUPQ .

ntsh999 commented 4 years ago

I did not check the logs on the broker side as I tried with another broker and it worked. The broker which produced the abv error had 'at the rate 'symbol in the password so may be it was creating problem.

PetitCedric commented 2 years ago

Hello,

I've the same issue with the sink connector.

Have you find any clues ? Or do you have maybe a solution ?

Regards

fjhuanca commented 1 year ago

I'm not sure if the same problem of above but I end here searching a solution for the same trace log. I realize that in the documentation de parameter for username is mqtt.username but on MQTTSinkConnectorConfig.java and MQTTSourceConnectorConfig.java is defined as mqtt.userName. The solution, as you might guess, was use mqtt.userName in properties file.

BilalHammas commented 7 months ago

Do you get any valuable logging on the MQTT broker side ? On Mon, Jun 1, 2020 at 11:37 AM ntsh999 @.***> wrote: Objective: to setup MQTT source connector with Confluent kafka v 5.4 Steps Performed: git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git cd kafka-connect-mqtt mvn clean install copied the jar from /kafka-connect-mqtt/target to the plugins directory. The path of this directory is mentioned in the worker configuration. executing http://:8083/connector-plugins gives the expected output. curl -d @./mqtt-source-connector-1.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors Expected: There should be no error logs in kafka connect and connector should be available for usage. But after 6th step is performed following is the error received in kafka connect logs : [2020-06-01 08:24:15,107] INFO [Producer clientId=connector-producer-johan-mqtt-source-connector-1-0] Cluster ID: 0ZK4cUcPSkmwpRsrIxZKxg (org.apache.kafka.clients.Metadata:259) [2020-06-01 08:24:15,416] INFO Connected to MQTT Broker (be.jovacon.kafka.connect.MQTTSourceConnector:38) [2020-06-01 08:24:15,416] INFO Subscribing to my_mqtt_topic with QOS 1 (be.jovacon.kafka.connect.MQTTSourceConnector:43) [2020-06-01 08:24:15,420] INFO WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2020-06-01 08:24:15,420] INFO WorkerSourceTask{id=johan-mqtt-source-connector-1-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2020-06-01 08:24:15,420] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: MqttException (128) at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:53) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: MqttException (128) at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:466) at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:454) at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:44) ... 8 more [2020-06-01 08:24:15,421] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180) [2020-06-01 08:24:15,425] INFO [Producer clientId=connector-producer-johan-mqtt-source-connector-1-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183) The error shows connection failure between mqtt broker and kafka connect. Although, I have entered correct mqtt broker credentials. I verified this by running the mosquitto pub sub commands from the VM on which my confluent platform is running. Any idea why am I getting this error? — You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub <#3>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AODU2ZFVYARARNAEI5NPODTRUNZGHANCNFSM4NPVHUPQ .

Hello do you have the solution of this MqttException(128), please let me know I can't click on the link ;) Great, Thanks ;)