johanvandevenne / kafka-connect-mqtt

Kafka Connect MQTT Connector
51 stars 21 forks source link

MQTTSourceTask not pick message from MQTT topic "my_mqtt_topic" at MQTT Broker #4

Open tinhle opened 4 years ago

tinhle commented 4 years ago

image

I started mqtt-source-connector, it is running as normal, it can connect to MQTT Broker, but when I publish a message to MQTT broker topic my_mqtt_topic. The Kafka MQTT source task is not received message.

image Please advice what should i done wrong.

kcs-santoshahire commented 3 years ago

Hello, I am also facing same issue with Kafka MQTTSource Connector. When I publish data from MQTT Topic it is not received at the Kafka consumer topics Kafka Mqtt Source Connector not receiving data published from MQTT and failed with "java.lang.NullPointerException"


curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
    "name" : "mqtt-source",
"config" : {
            "connector.class": "be.jovacon.kafka.connect.MQTTSourceConnector",
            "mqtt.topic": "temperature",
            "kafka.topic": "mqtt.",
            "mqtt.clientID": "my_client_id",
            "mqtt.broker": "tcp://127.0.0.1:1883",
            "tasks.max": "1"
    }
}'

When I execute above examples and check connector status it show me running like :

curl http://localhost:8083/connectors/mqtt-source/status | python -m json.tool % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 185 100 185 0 0 10277 0 --:--:-- --:--:-- --:--:-- 10277 { "connector": { "state": "RUNNING", "worker_id": "127.0.1.1:8083" }, "name": "mqtt-source", "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "127.0.1.1:8083" } ], "type": "source" }

But when I publish data from Mqtt Topic it is not recieved at the kafka consumer topics.

Consumer: kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mqtt. --from-beginning

Mqtt publish topic: mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10#"

I am getting below error when I check status of mqtt connector

{ "connector": { "state": "RUNNING", "worker_id": "127.0.1.1:8083" }, "name": "mqtt-source", "tasks": [ { "id": 0, "state": "FAILED", "trace": "java.lang.NullPointerException\n\tat

org.apache.kafka.connect.runtime.WorkerSourceTask.convertHeaderFor(WorkerSourceTask.java:296)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n",

"worker_id": "127.0.1.1:8083" } ], "type": "source" }

And because of above error not able to receive mqtt published message at kafka topic.

tinhle commented 3 years ago

I have successful to make the MQTT connector working both of source and sink connector. Contact me I share you how it works.

kcs-santoshahire commented 3 years ago

Hi @tinhle

Can you please share details with me.

How it is working for you?

What changes I should do to use both connectors?

tinhle commented 3 years ago

I changed convert function at MQTT Source Connector, it is working for both source and sink connector Kafka version kafka_2.12-2.5.0 and Eclipse Mosquitto MQTT broker 1.6.10 Source code shared at https://drive.google.com/file/d/10Gw-NGxdz48eg7LpJ1klIpVjv2XxlolB/view?usp=sharing

  1. MQTT Sink Connector { "config": { "connector.class": "be.jovacon.kafka.connect.MQTTSinkConnector", "key.converter.schemas.enable": "false", "topics": "oee_eqpt_insight_lrb01_kafka", "value.converter.schemas.enable": "false", "name": "oee_eqpt_insight_lrb01_kafka2mqtt_sink", "mqtt.broker": "tcp://10.140.11.83:1883", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "mqtt.clientID": "oee_eqpt_insight_lrb01", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "mqtt.topic": "oee_eqpt_insight_lrb01_mqtt" }, "name": "oee_eqpt_insight_lrb01_kafka2mqtt_sink", "tasks": [ { "connector": "oee_eqpt_insight_lrb01_kafka2mqtt_sink", "task": 0 } ], "type": "sink" }

  2. MQTT Source Connector { "config": { "connector.class": "be.jovacon.kafka.connect.MQTTSourceConnector", "key.converter.schemas.enable": "false", "value.serializer": "org.apache.kafka.common.serialization.StringSerializer", "value.converter.schemas.enable": "false", "name": "oee_eqpt_data_lrb02_source", "kafka.topic": "oee_eqpt_data_lrb02_kafka", "mqtt.broker": "tcp://10.140.11.83:1883", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "mqtt.clientID": "oee_eqpt_data_lrb02", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "mqtt.topic": "oee_eqpt_data_lrb02_mqtt", "key.serializer": "org.apache.kafka.common.serialization.StringSerializer" }, "name": "oee_eqpt_data_lrb02_source", "tasks": [ { "connector": "oee_eqpt_data_lrb02_source", "task": 0 } ], "type": "source" } Regards, Tony

mohamedaymanabdelhedi commented 1 year ago

Hi @tinhle I hope you are doing well.

How it is working for you? What changes I should do to use both connectors? Thank you for your help. Have a nice day.