SINTEF-9012 / kafka-mqtt-source-connector

A connector plugin to use with Kafka's Connect API. It can be configured to map any topic from an mqtt broker to any topic on the Kafka broker. The connector can be configured to use SSL in communication with the mqtt broker.
MIT License
24 stars 17 forks source link

Configuration help #5

Closed fabiotatsuo closed 3 years ago

fabiotatsuo commented 3 years ago

Hi,

Thanks for sharing your code.

I am trying to test your kafka connectors but I think I am missing something on configuration. I am following your instructions on README.md

I am using: Mac OS Big Sur Emqx 4.2.7 Kafka 2.7.0

I am getting this error, standalone and distributed, when trying to send message through mqtt clients.

Mosquitto $ mosquitto_pub -t 'test' -m 'helloWorld'

I tested other mqtt clients, mqttjs and paho.

[2021-02-11 21:22:06,029] INFO WorkerSourceTask{id=mqtt-source-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233) ERROR MqttSourceConnectorTask ERROR: Not able to create source record from mqtt message 'Hello synchronous world!' arrived on topic 'test' for client 'kafka_source_connector'. ERROR MqttSourceConnectorTask org.bson.json.JsonParseException: JSON reader was expecting a value but found 'Hello'. org.bson.json.JsonParseException: JSON reader was expecting a value but found 'Hello'. at org.bson.json.JsonReader.readBsonType(JsonReader.java:268) at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680) at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722) at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450) at org.bson.codecs.DocumentCodec.decode(DocumentCodec.java:161) at org.bson.codecs.DocumentCodec.decode(DocumentCodec.java:45) at org.bson.Document.parse(Document.java:110) at org.bson.Document.parse(Document.java:95) at com.sintef.asam.MqttSourceConnectorTask.makeDBDoc(MqttSourceConnectorTask.java:157) at com.sintef.asam.MqttSourceConnectorTask.messageArrived(MqttSourceConnectorTask.java:126) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:513) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:416) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:213) at java.base/java.lang.Thread.run(Thread.java:832)

I searched google for help and changed Key and Value converters. Tried Json, String and ByteArray with no success.

Key Value converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

key.converter.schemas.enable=false value.converter.schemas.enable=false

Testing Producer $ kafka-console-producer --broker-list localhost:9092 --topic test < message.txt

Testing Consumer $ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning -property print.key=true null Kafka test producer

Connectors $ curl -s -XGET http://localhost:19005/connector-plugins|jq '.[].class' "com.sintef.asam.MqttSinkConnector" "com.sintef.asam.MqttSourceConnector" "org.apache.kafka.connect.file.FileStreamSinkConnector" "org.apache.kafka.connect.file.FileStreamSourceConnector" "org.apache.kafka.connect.mirror.MirrorCheckpointConnector" "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector" "org.apache.kafka.connect.mirror.MirrorSourceConnector"

$ curl --header "Content-Type: application/json" http://127.0.0.1:19005/connectors ["mqtt-source-connector"]%

$ curl -s -X POST -H "Content-Type: application/json" http://127.0.0.1:19005/connectors -d '{"name":"mqtt-source-connector","config":{"connector.class":"com.sintef.asam.MqttSourceConnector","tasks.max":"1","mqtt.connector.broker.uri":"tcp://localhost:1883", "mqtt.connector.broker.topic":"test/#","mqtt.connector.kafka.topic":"test"}}' {"error_code":409,"message":"Connector mqtt-source-connector already exists"}%

$ kafka-topics --list --zookeeper localhost:2181 __consumer_offsets connect-configs connect-offsets connect-status position-reports test test-kafka test1 test2 upstream

Can you help me?

Thanks