A connector plugin to use with Kafka's Connect API. It can be configured to map any topic from an mqtt broker to any topic on the Kafka broker. The connector can be configured to use SSL in communication with the mqtt broker.
MIT License
24
stars
17
forks
source link
I setup according to the procedure specified in "Insecure - using tcp Connect Standalone". But It does not work. #9
I setup according to the procedure specified in "Insecure - using tcp Connect Standalone".
But It does not work.
The content of the error message is as follows
ERROR MqttSourceConnectorTask ERROR: Not able to create source record from mqtt message 'Hello, world!' arrived on topic 'test' for client 'kafka_source_connector'.
ERROR MqttSourceConnectorTask org.bson.json.JsonParseException: JSON reader was expecting a value but found 'Hello'.
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'Hello'.
at org.bson.json.JsonReader.readBsonType(JsonReader.java:268)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
at org.bson.codecs.DocumentCodec.decode(DocumentCodec.java:161)
at org.bson.codecs.DocumentCodec.decode(DocumentCodec.java:45)
at org.bson.Document.parse(Document.java:110)
at org.bson.Document.parse(Document.java:95)
at com.sintef.asam.MqttSourceConnectorTask.makeDBDoc(MqttSourceConnectorTask.java:157)
at com.sintef.asam.MqttSourceConnectorTask.messageArrived(MqttSourceConnectorTask.java:126)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:513)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:416)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:213)
at java.base/java.lang.Thread.run(Thread.java:829)
=================
mosquitto_pub -h localhost -t test -m "{\"id\":1234,\"message\":\"This is a test\"}"
ERROR MqttSourceConnectorTask ERROR: Not able to create source record from mqtt message '{"id":1234,"message":"This is a test"}' arrived on topic 'test' for client 'kafka_source_connector'.
ERROR MqttSourceConnectorTask java.lang.ArrayIndexOutOfBoundsException: Index 21 out of bounds for length 1
java.lang.ArrayIndexOutOfBoundsException: Index 21 out of bounds for length 1
at java.base/java.util.Arrays$ArrayList.get(Arrays.java:4351)
at com.sintef.asam.MqttSourceConnectorTask.makeDBDoc(MqttSourceConnectorTask.java:160)
at com.sintef.asam.MqttSourceConnectorTask.messageArrived(MqttSourceConnectorTask.java:126)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:513)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:416)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:213)
at java.base/java.lang.Thread.run(Thread.java:829)
I setup according to the procedure specified in "Insecure - using tcp Connect Standalone". But It does not work. The content of the error message is as follows
================= mosquitto_pub -h 127.0.0.1 -p 1883 -t test -m "Hello, world!"
ERROR MqttSourceConnectorTask ERROR: Not able to create source record from mqtt message 'Hello, world!' arrived on topic 'test' for client 'kafka_source_connector'. ERROR MqttSourceConnectorTask org.bson.json.JsonParseException: JSON reader was expecting a value but found 'Hello'. org.bson.json.JsonParseException: JSON reader was expecting a value but found 'Hello'. at org.bson.json.JsonReader.readBsonType(JsonReader.java:268) at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680) at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722) at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450) at org.bson.codecs.DocumentCodec.decode(DocumentCodec.java:161) at org.bson.codecs.DocumentCodec.decode(DocumentCodec.java:45) at org.bson.Document.parse(Document.java:110) at org.bson.Document.parse(Document.java:95) at com.sintef.asam.MqttSourceConnectorTask.makeDBDoc(MqttSourceConnectorTask.java:157) at com.sintef.asam.MqttSourceConnectorTask.messageArrived(MqttSourceConnectorTask.java:126) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:513) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:416) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:213) at java.base/java.lang.Thread.run(Thread.java:829)
================= mosquitto_pub -h localhost -t test -m "{\"id\":1234,\"message\":\"This is a test\"}"
ERROR MqttSourceConnectorTask ERROR: Not able to create source record from mqtt message '{"id":1234,"message":"This is a test"}' arrived on topic 'test' for client 'kafka_source_connector'. ERROR MqttSourceConnectorTask java.lang.ArrayIndexOutOfBoundsException: Index 21 out of bounds for length 1 java.lang.ArrayIndexOutOfBoundsException: Index 21 out of bounds for length 1 at java.base/java.util.Arrays$ArrayList.get(Arrays.java:4351) at com.sintef.asam.MqttSourceConnectorTask.makeDBDoc(MqttSourceConnectorTask.java:160) at com.sintef.asam.MqttSourceConnectorTask.messageArrived(MqttSourceConnectorTask.java:126) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:513) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:416) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:213) at java.base/java.lang.Thread.run(Thread.java:829)