ibm-messaging / kafka-connect-mq-sink

This repository contains a Kafka Connect sink connector for copying data from Apache Kafka into IBM MQ.
Apache License 2.0
35 stars 43 forks source link

MQ-SINK Connection Fails #7

Closed DhuriA closed 6 years ago

DhuriA commented 6 years ago

Here are some errors.

[2018-05-16 17:55:42,764] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109) [2018-05-16 17:55:42,764] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser:110) [2018-05-16 17:55:42,765] INFO Created connector mq-sink (org.apache.kafka.connect.cli.ConnectStandalone:99) [2018-05-16 17:55:42,935] INFO Building messages using com.ibm.mq.kafkaconnect.builders.DefaultMessageBuilder (com.ibm.mq.kafkaconnect.builders.DefaultMessageBuilder:45) [2018-05-16 17:55:43,176] INFO Connection to MQ established (com.ibm.mq.kafkaconnect.JMSWriter:144) [2018-05-16 17:55:43,176] INFO WorkerSinkTask{id=mq-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:268) [2018-05-16 17:55:43,272] INFO [Consumer clientId=consumer-1, groupId=connect-mq-sink] Discovered coordinator r0147nn0v.bnymellon.net:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341) [2018-05-16 17:55:43,274] INFO [Consumer clientId=consumer-1, groupId=connect-mq-sink] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:341) [2018-05-16 17:55:43,274] INFO [Consumer clientId=consumer-1, groupId=connect-mq-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336) [2018-05-16 17:55:43,284] INFO [Consumer clientId=consumer-1, groupId=connect-mq-sink] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341) [2018-05-16 17:55:43,285] INFO [Consumer clientId=consumer-1, groupId=connect-mq-sink] Setting newly assigned partitions [TSINK-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:341) [2018-05-16 17:55:43,319] ERROR WorkerSinkTask{id=mq-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2018-05-16 17:55:43,321] ERROR WorkerSinkTask{id=mq-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)

AndrewJSchofield commented 6 years ago

This looks like a mismatch between your configuration and the data. My suggestion is that you are try to send JSON messages without an embedded schema and that you are missing value.converter.schemas.enable=false in your configuration.

DhuriA commented 6 years ago

Not the issue with the JASON because I am not using JASON Data Converter. Here is the issue. The key.converter for Byte Array was commented while the value.converter was setup for Byte Array. After uncommenting key.converter it worked.

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter_

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter_

I am closing the issue.