johanvandevenne / kafka-connect-mqtt

Kafka Connect MQTT Connector
51 stars 21 forks source link

No error, but no MQTT message detected by KAFKA #14

Open Momentum96 opened 1 year ago

Momentum96 commented 1 year ago

MQTT Broker = nanomq, mosquito kafka version = 2.6.2

I'm testing your kafka-connect-mqtt in local ubuntu environment.

  1. clone this repository
  2. change directory into kafka-connect-mqtt and mvn clean install
  3. copy the folder /target/kafka-connect-mqtt-1.1.0-package/kafka-connect-mqtt to my kafka connect plugin path
  4. connector has been installed successfully

[{"class":"be.jovacon.kafka.connect.MQTTSinkConnector","type":"sink","version":"1.1.0"},{"class":"be.jovacon.kafka.connect.MQTTSourceConnector","type":"source","version":"1.1.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.6.2"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.6.2"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

  1. Configuring the source connector. I made connect-mqtt-source.properties at $KAFKA_HOME/config.

connect-mqtt-source.properties

name=mqtt-source-connector connector.class=be.jovacon.kafka.connect.MQTTSourceConnector mqtt.topic=test kafka.topic=mqtt.test mqtt.clientID=cid mqtt.broker=tcp://127.0.0.1:1883 key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false value.converter=org.apache.kafka.connect.storage.StringConverter value.converter.schemas.enable=false

  1. restart kafka connect $ bin/connect-standalone.sh config/connect-standalone.properties config/connect-mqtt-source.properties

  2. result [2022-11-23 10:21:20,740] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:69) [2022-11-23 10:21:20,746] INFO WorkerInfo values: jvm.args = -Xms256M, -Xmx2G, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -XX:MaxInlineLevel=15, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../logs, -Dlog4j.configuration=file:bin/../config/connect-log4j.properties jvm.spec = Ubuntu, OpenJDK 64-Bit Server VM, 11.0.17, 11.0.17+8-post-Ubuntu-1ubuntu222.04 jvm.classpath = /home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/activation-1.1.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/argparse4j-0.7.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/audience-annotations-0.5.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/commons-cli-1.4.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/commons-lang3-3.8.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-api-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-basic-auth-extension-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-file-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-json-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-mirror-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-mirror-client-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-runtime-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-transforms-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-utils-0.4.156.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/freemarker-2.3.28.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/hk2-api-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/hk2-locator-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/hk2-utils-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-annotations-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-core-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-databind-2.10.5.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-module-scala_2.12-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.inject-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javassist-3.25.0-GA.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javassist-3.26.0-GA.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javax.servlet-api-3.1.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jaxb-api-2.3.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-client-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-common-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-container-servlet-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-container-servlet-core-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-hk2-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-media-jaxb-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-server-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-client-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-continuation-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-http-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-io-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-security-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-server-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-servlet-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-servlets-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-util-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-util-ajax-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jopt-simple-5.0.4.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-clients-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-connect-mqtt-1.1.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-log4j-appender-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-examples-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-scala_2.12-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-test-utils-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-tools-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka_2.12-2.6.2-sources.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka_2.12-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/log4j-1.2.17.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/log4j-api-2.11.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/log4j-core-2.11.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/lz4-java-1.7.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/maven-artifact-3.6.3.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/metrics-core-2.2.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-codec-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-common-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-handler-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-transport-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/org.eclipse.paho.client.mqttv3-1.2.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/paranamer-2.8.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/plexus-utils-3.2.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/reflections-0.9.12.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/rocksdbjni-5.18.4.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-collection-compat_2.12-2.1.6.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-java8-compat_2.12-0.9.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-library-2.12.11.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-logging_2.12-3.9.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-reflect-2.12.11.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/slf4j-api-1.7.30.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/snappy-java-1.1.7.3.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/zookeeper-3.5.9.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/zookeeper-jute-3.5.9.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/zstd-jni-1.4.4-7.jar os.spec = Linux, amd64, 5.15.74.2-microsoft-standard-WSL2 os.vcpus = 16 (org.apache.kafka.connect.runtime.WorkerInfo:71) [2022-11-23 10:21:20,747] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:78) [2022-11-23 10:21:20,756] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/freemarker-2.3.28.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246) [2022-11-23 10:21:20,914] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/freemarker-2.3.28.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269) [2022-11-23 10:21:20,915] INFO Added plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:20,915] INFO Added plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:20,915] INFO Added plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:20,916] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/org.eclipse.paho.client.mqttv3-1.2.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246) [2022-11-23 10:21:20,992] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/org.eclipse.paho.client.mqttv3-1.2.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269) [2022-11-23 10:21:20,993] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-api-2.11.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246) [2022-11-23 10:21:21,011] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-api-2.11.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269) [2022-11-23 10:21:21,012] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/kafka-connect-mqtt-1.1.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246) [2022-11-23 10:21:21,026] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/kafka-connect-mqtt-1.1.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269) [2022-11-23 10:21:21,026] INFO Added plugin 'be.jovacon.kafka.connect.MQTTSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,026] INFO Added plugin 'be.jovacon.kafka.connect.MQTTSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,028] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-core-2.11.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246) [2022-11-23 10:21:21,111] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-core-2.11.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269) [2022-11-23 10:21:21,111] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/connect-utils-0.4.156.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246) [2022-11-23 10:21:21,122] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/connect-utils-0.4.156.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269) [2022-11-23 10:21:21,740] INFO Registered loader: jdk.internal.loader.ClassLoaders$AppClassLoader@3d4eac69 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269) [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.mirror.MirrorSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.FloatConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.DoubleConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.LongConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.IntegerConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.ShortConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.storage.SimpleHeaderConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.Filter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.runtime.PredicatedTransformation' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,744] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.transforms.predicates.HasHeaderKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.transforms.predicates.RecordIsTombstone' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.common.config.provider.FileConfigProvider' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) [2022-11-23 10:21:21,746] INFO Added aliases 'MQTTSinkConnector' and 'MQTTSink' to plugin 'be.jovacon.kafka.connect.MQTTSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,746] INFO Added aliases 'MQTTSourceConnector' and 'MQTTSource' to plugin 'be.jovacon.kafka.connect.MQTTSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,746] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,746] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,746] INFO Added aliases 'MirrorCheckpointConnector' and 'MirrorCheckpoint' to plugin 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,746] INFO Added aliases 'MirrorHeartbeatConnector' and 'MirrorHeartbeat' to plugin 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,746] INFO Added aliases 'MirrorSourceConnector' and 'MirrorSource' to plugin 'org.apache.kafka.connect.mirror.MirrorSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,746] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'MockSourceConnector' and 'MockSource' to plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'SchemaSourceConnector' and 'SchemaSource' to plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'VerifiableSinkConnector' and 'VerifiableSink' to plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'VerifiableSourceConnector' and 'VerifiableSource' to plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'DoubleConverter' and 'Double' to plugin 'org.apache.kafka.connect.converters.DoubleConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'FloatConverter' and 'Float' to plugin 'org.apache.kafka.connect.converters.FloatConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'IntegerConverter' and 'Integer' to plugin 'org.apache.kafka.connect.converters.IntegerConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'LongConverter' and 'Long' to plugin 'org.apache.kafka.connect.converters.LongConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'ShortConverter' and 'Short' to plugin 'org.apache.kafka.connect.converters.ShortConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,747] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,748] INFO Added aliases 'DoubleConverter' and 'Double' to plugin 'org.apache.kafka.connect.converters.DoubleConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,748] INFO Added aliases 'FloatConverter' and 'Float' to plugin 'org.apache.kafka.connect.converters.FloatConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,748] INFO Added aliases 'IntegerConverter' and 'Integer' to plugin 'org.apache.kafka.connect.converters.IntegerConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,748] INFO Added aliases 'LongConverter' and 'Long' to plugin 'org.apache.kafka.connect.converters.LongConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,748] INFO Added aliases 'ShortConverter' and 'Short' to plugin 'org.apache.kafka.connect.converters.ShortConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,748] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,748] INFO Added alias 'SimpleHeaderConverter' to plugin 'org.apache.kafka.connect.storage.SimpleHeaderConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427) [2022-11-23 10:21:21,748] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,748] INFO Added aliases 'PredicatedTransformation' and 'Predicated' to plugin 'org.apache.kafka.connect.runtime.PredicatedTransformation' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,748] INFO Added alias 'Filter' to plugin 'org.apache.kafka.connect.transforms.Filter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427) [2022-11-23 10:21:21,748] INFO Added alias 'RegexRouter' to plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427) [2022-11-23 10:21:21,748] INFO Added alias 'TimestampRouter' to plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427) [2022-11-23 10:21:21,749] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427) [2022-11-23 10:21:21,749] INFO Added alias 'HasHeaderKey' to plugin 'org.apache.kafka.connect.transforms.predicates.HasHeaderKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427) [2022-11-23 10:21:21,749] INFO Added alias 'RecordIsTombstone' to plugin 'org.apache.kafka.connect.transforms.predicates.RecordIsTombstone' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427) [2022-11-23 10:21:21,749] INFO Added alias 'TopicNameMatches' to plugin 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427) [2022-11-23 10:21:21,749] INFO Added alias 'BasicAuthSecurityRestExtension' to plugin 'org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427) [2022-11-23 10:21:21,749] INFO Added aliases 'AllConnectorClientConfigOverridePolicy' and 'All' to plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,749] INFO Added aliases 'NoneConnectorClientConfigOverridePolicy' and 'None' to plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,749] INFO Added aliases 'PrincipalConnectorClientConfigOverridePolicy' and 'Principal' to plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430) [2022-11-23 10:21:21,761] INFO StandaloneConfig values: access.control.allow.methods = access.control.allow.origin = admin.listeners = null bootstrap.servers = [localhost:9092] client.dns.lookup = use_all_dns_ips config.providers = [] connector.client.config.override.policy = None header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter internal.key.converter = class org.apache.kafka.connect.json.JsonConverter internal.value.converter = class org.apache.kafka.connect.json.JsonConverter key.converter = class org.apache.kafka.connect.json.JsonConverter listeners = null metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 offset.flush.interval.ms = 10000 offset.flush.timeout.ms = 5000 offset.storage.file.filename = /tmp/connect.offsets plugin.path = [plugins/kafka-connect-mqtt/] response.http.headers.config = rest.advertised.host.name = null rest.advertised.listener = null rest.advertised.port = null rest.extension.classes = [] rest.host.name = null rest.port = 8083 ssl.cipher.suites = null ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS task.shutdown.graceful.timeout.ms = 5000 topic.creation.enable = true topic.tracking.allow.reset = true topic.tracking.enable = true value.converter = class org.apache.kafka.connect.json.JsonConverter (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:354) [2022-11-23 10:21:21,761] INFO Creating Kafka admin client (org.apache.kafka.connect.util.ConnectUtils:49) [2022-11-23 10:21:21,764] INFO AdminClientConfig values: bootstrap.servers = [localhost:9092] client.dns.lookup = use_all_dns_ips client.id = connections.max.idle.ms = 300000 default.api.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS (org.apache.kafka.clients.admin.AdminClientConfig:354) [2022-11-23 10:21:21,809] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:21,809] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:21,809] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:21,809] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:21,809] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:21,809] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:21,809] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:21,810] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117) [2022-11-23 10:21:21,810] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118) [2022-11-23 10:21:21,810] INFO Kafka startTimeMs: 1669166481809 (org.apache.kafka.common.utils.AppInfoParser:119) [2022-11-23 10:21:22,013] INFO Kafka cluster ID: GFsZxHhuTKS1t3X54RH32g (org.apache.kafka.connect.util.ConnectUtils:65) [2022-11-23 10:21:22,026] INFO Logging initialized @1614ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:169) [2022-11-23 10:21:22,055] INFO Added connector for http://:8083 (org.apache.kafka.connect.runtime.rest.RestServer:132) [2022-11-23 10:21:22,056] INFO Initializing REST server (org.apache.kafka.connect.runtime.rest.RestServer:204) [2022-11-23 10:21:22,062] INFO jetty-9.4.38.v20210224; built: 2021-02-24T20:25:07.675Z; git: 288f3cc74549e8a913bf363250b0744f2695b8e6; jvm 11.0.17+8-post-Ubuntu-1ubuntu222.04 (org.eclipse.jetty.server.Server:375) [2022-11-23 10:21:22,085] INFO Started http_8083@5649ec46{HTTP/1.1, (http/1.1)}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:331) [2022-11-23 10:21:22,086] INFO Started @1674ms (org.eclipse.jetty.server.Server:415) [2022-11-23 10:21:22,105] INFO Advertised URI: http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:371) [2022-11-23 10:21:22,105] INFO REST server listening at http://127.0.1.1:8083/, advertising URL http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:219) [2022-11-23 10:21:22,106] INFO Advertised URI: http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:371) [2022-11-23 10:21:22,106] INFO REST admin endpoints at http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:220) [2022-11-23 10:21:22,106] INFO Advertised URI: http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:371) [2022-11-23 10:21:22,106] INFO Setting up None Policy for ConnectorClientConfigOverride. This will disallow any client configuration to be overridden (org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy:45) [2022-11-23 10:21:22,112] INFO Creating Kafka admin client (org.apache.kafka.connect.util.ConnectUtils:49) [2022-11-23 10:21:22,113] INFO AdminClientConfig values: bootstrap.servers = [localhost:9092] client.dns.lookup = use_all_dns_ips client.id = connections.max.idle.ms = 300000 default.api.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS (org.apache.kafka.clients.admin.AdminClientConfig:354) [2022-11-23 10:21:22,118] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:22,119] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:22,119] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:22,119] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:22,119] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:22,119] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:22,119] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362) [2022-11-23 10:21:22,119] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117) [2022-11-23 10:21:22,120] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118) [2022-11-23 10:21:22,120] INFO Kafka startTimeMs: 1669166482119 (org.apache.kafka.common.utils.AppInfoParser:119) [2022-11-23 10:21:22,132] INFO Kafka cluster ID: GFsZxHhuTKS1t3X54RH32g (org.apache.kafka.connect.util.ConnectUtils:65) [2022-11-23 10:21:22,137] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117) [2022-11-23 10:21:22,137] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118) [2022-11-23 10:21:22,137] INFO Kafka startTimeMs: 1669166482137 (org.apache.kafka.common.utils.AppInfoParser:119) [2022-11-23 10:21:22,210] INFO JsonConverterConfig values: converter.type = key decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = false (org.apache.kafka.connect.json.JsonConverterConfig:354) [2022-11-23 10:21:22,211] INFO JsonConverterConfig values: converter.type = value decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = false (org.apache.kafka.connect.json.JsonConverterConfig:354) [2022-11-23 10:21:22,217] INFO Kafka Connect standalone worker initialization took 1476ms (org.apache.kafka.connect.cli.ConnectStandalone:100) [2022-11-23 10:21:22,217] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:51) [2022-11-23 10:21:22,218] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:94) [2022-11-23 10:21:22,218] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:195) [2022-11-23 10:21:22,218] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:58) [2022-11-23 10:21:22,222] INFO Worker started (org.apache.kafka.connect.runtime.Worker:202) [2022-11-23 10:21:22,222] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:97) [2022-11-23 10:21:22,222] INFO Initializing REST resources (org.apache.kafka.connect.runtime.rest.RestServer:224) [2022-11-23 10:21:22,248] INFO Adding admin resources to main listener (org.apache.kafka.connect.runtime.rest.RestServer:241) [2022-11-23 10:21:22,297] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session:334) [2022-11-23 10:21:22,297] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session:339) [2022-11-23 10:21:22,298] INFO node0 Scavenging every 600000ms (org.eclipse.jetty.server.session:132) Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored. Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored. Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be ignored. Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.RootResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.Errors logErrors WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation. WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation. WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation. WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation. WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2022-11-23 10:21:22,663] INFO Started o.e.j.s.ServletContextHandler@27a7ef08{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:916) [2022-11-23 10:21:22,663] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:319) [2022-11-23 10:21:22,663] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57) [2022-11-23 10:21:22,672] INFO AbstractConfig values: (org.apache.kafka.common.config.AbstractConfig:354) [2022-11-23 10:21:22,686] INFO Creating connector mqtt-source-connector of type be.jovacon.kafka.connect.MQTTSourceConnector (org.apache.kafka.connect.runtime.Worker:274) [2022-11-23 10:21:22,687] INFO SourceConnectorConfig values: config.action.reload = restart connector.class = be.jovacon.kafka.connect.MQTTSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = mqtt-source-connector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [] value.converter = class org.apache.kafka.connect.storage.StringConverter (org.apache.kafka.connect.runtime.SourceConnectorConfig:354) [2022-11-23 10:21:22,687] INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = be.jovacon.kafka.connect.MQTTSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = mqtt-source-connector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [] value.converter = class org.apache.kafka.connect.storage.StringConverter (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354) [2022-11-23 10:21:22,692] INFO Instantiated connector mqtt-source-connector with version 1.1.0 of type class be.jovacon.kafka.connect.MQTTSourceConnector (org.apache.kafka.connect.runtime.Worker:284) [2022-11-23 10:21:22,693] INFO Finished creating connector mqtt-source-connector (org.apache.kafka.connect.runtime.Worker:310) [2022-11-23 10:21:22,695] INFO MQTTSourceConnectorConfig values: kafka.topic = mqtt.test mqtt.automaticReconnect = true mqtt.broker = tcp://127.0.0.1:1883 mqtt.cleanSession = true mqtt.clientID = cid mqtt.connectionTimeout = 30 mqtt.keepAliveInterval = 60 mqtt.password = [hidden] mqtt.qos = 1 mqtt.topic = test mqtt.userName = (be.jovacon.kafka.connect.config.MQTTSourceConnectorConfig:354) [2022-11-23 10:21:22,697] INFO SourceConnectorConfig values: config.action.reload = restart connector.class = be.jovacon.kafka.connect.MQTTSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = mqtt-source-connector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [] value.converter = class org.apache.kafka.connect.storage.StringConverter (org.apache.kafka.connect.runtime.SourceConnectorConfig:354) [2022-11-23 10:21:22,697] INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = be.jovacon.kafka.connect.MQTTSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = mqtt-source-connector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [] value.converter = class org.apache.kafka.connect.storage.StringConverter (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354) [2022-11-23 10:21:22,698] INFO Creating task mqtt-source-connector-0 (org.apache.kafka.connect.runtime.Worker:509) [2022-11-23 10:21:22,700] INFO ConnectorConfig values: config.action.reload = restart connector.class = be.jovacon.kafka.connect.MQTTSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = mqtt-source-connector predicates = [] tasks.max = 1 transforms = [] value.converter = class org.apache.kafka.connect.storage.StringConverter (org.apache.kafka.connect.runtime.ConnectorConfig:354) [2022-11-23 10:21:22,700] INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = be.jovacon.kafka.connect.MQTTSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = mqtt-source-connector predicates = [] tasks.max = 1 transforms = [] value.converter = class org.apache.kafka.connect.storage.StringConverter (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354) [2022-11-23 10:21:22,701] INFO TaskConfig values: task.class = class be.jovacon.kafka.connect.MQTTSourceTask (org.apache.kafka.connect.runtime.TaskConfig:354) [2022-11-23 10:21:22,701] INFO Instantiated task mqtt-source-connector-0 with version 1.1.0 of type be.jovacon.kafka.connect.MQTTSourceTask (org.apache.kafka.connect.runtime.Worker:524) [2022-11-23 10:21:22,702] INFO StringConverterConfig values: converter.encoding = UTF8 converter.type = key (org.apache.kafka.connect.storage.StringConverterConfig:354) [2022-11-23 10:21:22,702] INFO StringConverterConfig values: converter.encoding = UTF8 converter.type = value (org.apache.kafka.connect.storage.StringConverterConfig:354) [2022-11-23 10:21:22,702] INFO Set up the key converter class org.apache.kafka.connect.storage.StringConverter for task mqtt-source-connector-0 using the connector config (org.apache.kafka.connect.runtime.Worker:539) [2022-11-23 10:21:22,702] INFO Set up the value converter class org.apache.kafka.connect.storage.StringConverter for task mqtt-source-connector-0 using the connector config (org.apache.kafka.connect.runtime.Worker:545) [2022-11-23 10:21:22,703] INFO Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task mqtt-source-connector-0 using the worker config (org.apache.kafka.connect.runtime.Worker:550) [2022-11-23 10:21:22,705] INFO SourceConnectorConfig values: config.action.reload = restart connector.class = be.jovacon.kafka.connect.MQTTSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = mqtt-source-connector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [] value.converter = class org.apache.kafka.connect.storage.StringConverter (org.apache.kafka.connect.runtime.SourceConnectorConfig:354) [2022-11-23 10:21:22,705] INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = be.jovacon.kafka.connect.MQTTSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = mqtt-source-connector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [] value.converter = class org.apache.kafka.connect.storage.StringConverter (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354) [2022-11-23 10:21:22,707] INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker:606) [2022-11-23 10:21:22,713] INFO ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = connector-producer-mqtt-source-connector-0 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 2147483647 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 9223372036854775807 max.in.flight.requests.per.connection = 1 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 2147483647 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer (org.apache.kafka.clients.producer.ProducerConfig:354) [2022-11-23 10:21:22,728] WARN The configuration 'metrics.context.connect.kafka.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig:362) [2022-11-23 10:21:22,728] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117) [2022-11-23 10:21:22,729] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118) [2022-11-23 10:21:22,729] INFO Kafka startTimeMs: 1669166482728 (org.apache.kafka.common.utils.AppInfoParser:119) [2022-11-23 10:21:22,738] INFO MQTTSourceConnectorConfig values: kafka.topic = mqtt.test mqtt.automaticReconnect = true mqtt.broker = tcp://127.0.0.1:1883 mqtt.cleanSession = true mqtt.clientID = cid mqtt.connectionTimeout = 30 mqtt.keepAliveInterval = 60 mqtt.password = [hidden] mqtt.qos = 1 mqtt.topic = test mqtt.userName = (be.jovacon.kafka.connect.config.MQTTSourceConnectorConfig:354) [2022-11-23 10:21:22,740] INFO Created connector mqtt-source-connector (org.apache.kafka.connect.cli.ConnectStandalone:112) [2022-11-23 10:21:22,741] INFO WorkerSourceTask{id=mqtt-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488) [2022-11-23 10:21:22,741] INFO WorkerSourceTask{id=mqtt-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505) [2022-11-23 10:21:22,742] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:191) java.lang.NoClassDefFoundError: com/google/common/base/Preconditions at com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDequeBuilder.build(SourceRecordDequeBuilder.java:95) at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:32) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ... 10 more [2022-11-23 10:21:22,742] INFO [Producer clientId=connector-producer-mqtt-source-connector-0] Cluster ID: GFsZxHhuTKS1t3X54RH32g (org.apache.kafka.clients.Metadata:279) [2022-11-23 10:21:22,744] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:192) [2022-11-23 10:21:22,744] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSourceTask:175) java.lang.NullPointerException at be.jovacon.kafka.connect.MQTTSourceTask.stop(MQTTSourceTask.java:87) at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:173) at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:168) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:195) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) [2022-11-23 10:21:22,745] INFO [Producer clientId=connector-producer-mqtt-source-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189

  1. publish mqtt message to mqtt broker(localhost:1883) topic test, but i can't receive message at kafka topic mqtt.test

I think I did a very good job following README, but is there anything missing or overlooked in the middle?

Momentum96 commented 1 year ago

I checked a few more things.

Basic connector provided by kafka are working

curl -X GET "http://localhost:8083/connectors/"

curl -X GET "http://localhost:8083/connectors/local-file-source/status"

curl -X GET "http://localhost:8083/connectors/mqtt-source-connector/status"

SP-TS1 commented 1 year ago

Hi, I'm facing the same issue with the source connector.

Have you find any clues ? Or do you have maybe a solution ?

Regards

SP-TS1 commented 1 year ago

for anyone who's still stuck with the same problem, I'll guide you how I resolve this and track the cause

  1. I noticed that when I did curl -X GET "http://localhost:8083/connectors/mqtt-source-connector/status" , the tasks field is empty which mean our connector can't establish the connection yet.
  2. So, to track the cause. I modify connect-log4j.properties (located in where you install Kafka along with .properties files) from log4j.rootLogger=INFO, stdout, connectAppender to log4j.rootLogger=TRACE, stdout, connectAppender and also add errors.log.include.messages = true and errors.log.enable= true (thanks to this)
  3. I found that the problem is it can't find classes to establish connection ie. Class 'org.eclipse.paho.client.mqttv3.IMqttMessageListener' not found. because I didn't export the class path, solution is to run this command : export CLASSPATH=/<where you installed and extract the kafka-connect-mqtt>/kafka-connect-mqtt/target/kafka-connect-mqtt-1.1.0-package/kafka-connect-mqtt/* (following this confluent docs)
  4. I also encountered this problem java.lang.ClassNotFoundException: com.google.common.base.Preconditions the solution is answered in this thread

wish this could save your debugging time πŸ˜„

Momentum96 commented 1 year ago

Hi.

I solve that issue using Apache Camel MQTT Source Connector. You can try it.

I Share to you what i tried.

I hope it helps you.

Best regards

Apache Camel

Camel Kafka Connector https://camel.apache.org/camel-kafka-connector/3.18.x/index.html

camel-mqtt-source-kafka-connector source configuration https://camel.apache.org/camel-kafka-connector/3.18.x/reference/connectors/camel-mqtt-source-kafka-source-connector.html

Try it out locally https://camel.apache.org/camel-kafka-connector/3.18.x/user-guide/getting-started/try-it-out-locally.html

GitHub - apache/camel-kafka-connector: Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors https://github.com/apache/camel-kafka-connector

$ sudo apt install maven $ git clone https://github.com/apache/camel-kafka-connector $ cd camel-kafka-connector/connectors/camel-mqtt-source-kafka-connector $ mvn clean package $ cd target $ cp KAFKA_PLUGIN_PATH ./*.tar.gz

Move to kafka directory

$ nano config/camel-mqtt-source.properties

name=CamelMqttSourceConnector

connector.class=org.apache.camel.kafkaconnector.mqttsource.Came> key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false

value.converter=org.apache.kafka.connect.storage.StringConvert>

value.converter=org.apache.kafka.connect.converters.ByteArrayCo> value.converter.schemas.enable=false

topics=test camel.kamelet.mqtt-source.topic=test camel.kamelet.mqtt-source.brokerUrl=tcp://127.0.0.1:1883 converter.encoding=UTF-8

Run kafka connect

$ bin/connect-standalone.sh config/connect-standalone.properties config/camel-mqtt-source.properties

After, run kafka consumer and publish message to mqtt broker for testing

2022λ…„ 12μ›” 30일 (금) μ˜€μ „ 3:36, SP-TS1 @.***>λ‹˜μ΄ μž‘μ„±:

Hi, I'm facing the same issue with the source connector.

Have you find any clues ? Or do you have maybe a solution ?

Regards

β€” Reply to this email directly, view it on GitHub https://github.com/johanvandevenne/kafka-connect-mqtt/issues/14#issuecomment-1367513537, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHPZMEMZBA2OCESOKXICDZTWPXK3DANCNFSM6AAAAAASNAOL6Q . You are receiving this because you authored the thread.Message ID: @.***>

kulta4 commented 6 months ago

Hi. I solve that issue using Apache Camel MQTT Source Connector. You can try it. I Share to you what i tried. I hope it helps you. Best regards Apache Camel Camel Kafka Connector https://camel.apache.org/camel-kafka-connector/3.18.x/index.html camel-mqtt-source-kafka-connector source configuration https://camel.apache.org/camel-kafka-connector/3.18.x/reference/connectors/camel-mqtt-source-kafka-source-connector.html Try it out locally https://camel.apache.org/camel-kafka-connector/3.18.x/user-guide/getting-started/try-it-out-locally.html GitHub - apache/camel-kafka-connector: Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors https://github.com/apache/camel-kafka-connector $ sudo apt install maven $ git clone https://github.com/apache/camel-kafka-connector $ cd camel-kafka-connector/connectors/camel-mqtt-source-kafka-connector $ mvn clean package $ cd target $ cp KAFKA_PLUGIN_PATH ./*.tar.gz # Move to kafka directory $ nano config/camel-mqtt-source.properties name=CamelMqttSourceConnector connector.class=org.apache.camel.kafkaconnector.mqttsource.Came> key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false #value.converter=org.apache.kafka.connect.storage.StringConvert> value.converter=org.apache.kafka.connect.converters.ByteArrayCo> value.converter.schemas.enable=false topics=test camel.kamelet.mqtt-source.topic=test camel.kamelet.mqtt-source.brokerUrl=tcp://127.0.0.1:1883 converter.encoding=UTF-8 # Run kafka connect $ bin/connect-standalone.sh config/connect-standalone.properties config/camel-mqtt-source.properties # After, run kafka consumer and publish message to mqtt broker for testing 2022λ…„ 12μ›” 30일 (금) μ˜€μ „ 3:36, SP-TS1 @.>λ‹˜μ΄ μž‘μ„±: … Hi, I'm facing the same issue with the source connector. Have you find any clues ? Or do you have maybe a solution ? Regards β€” Reply to this email directly, view it on GitHub <#14 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHPZMEMZBA2OCESOKXICDZTWPXK3DANCNFSM6AAAAAASNAOL6Q . You are receiving this because you authored the thread.Message ID: @.>

Hi, I tried this connector, but I am facing an error, I follow the exact steps you mentioned above.

image

can you please help in this case. for this I added the dependency of camel-mqtt-source-connect on pom file and I changed the connector config to this name=CamelMqttSourceConnector

connector.class=org.apache.camel.kafkaconnector.mqttsource.CamelMqttsourceSourceConnector

key.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=false

value.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

value.converter.schemas.enable=false

topics=test

camel.kamelet.mqtt-source.topic=test

camel.kamelet.mqtt-source.brokerUrl=tcp://127.0.0.1:1883

converter.encoding=UTF-8

because of the error: failing to implement the class...

thank you