awslabs / kinesis-kafka-connector

kinesis-kafka-connector is connector based on Kafka Connect to publish messages to Amazon Kinesis streams or Amazon Kinesis Firehose.
Apache License 2.0
151 stars 91 forks source link

Task fails when reading Avro encoded event from Kakfa Topic #14

Open vsaini-loyaltymethods opened 6 years ago

vsaini-loyaltymethods commented 6 years ago

What properties should be configured for Worker and Connector to consume Avro encoded events from Kafka Topic?

Currently the task fails if i use Avro Converters as mentioned below.

connect-avro-standalone.properties key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081

[2017-11-03 15:19:32,588] ERROR Task Connector-Sink-Kinesis-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455) java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:396) at com.amazon.kinesis.kafka.DataUtility.parseValue(DataUtility.java:88) at com.amazon.kinesis.kafka.AmazonKinesisSinkTask.put(AmazonKinesisSinkTask.java:102) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2017-11-03 15:19:32,589] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456) [2017-11-03 15:19:32,591] ERROR Task Connector-Sink-Kinesis-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2017-11-03 15:19:32,592] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149) [2017-11-03 15:19:33,599] WARN Exception during updateCredentials (com.amazonaws.services.kinesis.producer.Daemon:322) java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at com.amazonaws.services.kinesis.producer.Daemon$5.run(Daemon.java:320) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

avolochenko commented 6 years ago

+1 having exact same issue

nehalmehta commented 6 years ago

@sqlcook @vsaini-rythmos Can you share sample schema that you are using (anonymize it)? I am working on solution for error mentioned above but want to verify it resolves your issue.

avolochenko commented 6 years ago

@nehalmehta thanks for looking into it, here is an example of simplified version of my schema:

[ { "namespace": "com.example.model", "name": "Event", "type": "record", "fields": [ { "name": "field", "type": "string" }, { "name": "field", "type": [ "null", "string" ], "default": null }, { "name": "field", "type": { "type": "array", "items": "Attributes" } }, { "name": "field", "type": "string" }, { "name": "field", "type": "long" } ] }, { "namespace": "com.example.model", "name": "Attributes", "type": "record", "fields": [ { "name": "field", "type": "long" }, { "name": "field", "type": "boolean" } ] } ]

vsaini-loyaltymethods commented 6 years ago

@nehalmehta Any idea by when this issue will be resolved?

p2krish commented 6 years ago

Any idea when this starts supporting Avro. I'm trying to use the master branch but I get the following exception

java.lang.NullPointerException at com.amazon.kinesis.kafka.DataUtility.parseValue(DataUtility.java:43) at com.amazon.kinesis.kafka.DataUtility.lambda$parseValue$1(DataUtility.java:96) at java.util.ArrayList.forEach(ArrayList.java:1249) at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080) at com.amazon.kinesis.kafka.DataUtility.parseValue(DataUtility.java:96) at com.amazon.kinesis.kafka.AmazonKinesisSinkTask.addUserRecord(AmazonKinesisSinkTask.java:224)

YemingHuang commented 6 years ago

Same issue for me. +1

garethmdavies commented 5 years ago

I've started some work on adding AVRO parsing support; configurable via a new task property. (Firehose sink task not yet supported though should be trivial to get working). The parser works without a schema registry and simply parsers using the schema passed with the message. Branch here: https://github.com/taffowl/kinesis-kafka-connector/tree/feature/add-avro-support

deuscapturus commented 4 years ago

Same issue here. +1

desavera commented 3 years ago

Any estimates on adding this feature @garethmdavies ?