awslabs / kinesis-kafka-connector

kinesis-kafka-connector is connector based on Kafka Connect to publish messages to Amazon Kinesis streams or Amazon Kinesis Firehose.
Apache License 2.0
153 stars 91 forks source link

SinkConnector fails on JSON input with NullPointerException #22

Closed Pyons closed 6 years ago

Pyons commented 6 years ago
connect_1            | [2018-04-25 09:40:48,691] ERROR WorkerSinkTask{id=KinesisSinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect_1            | java.lang.NullPointerException
connect_1            |  at com.amazon.kinesis.kafka.DataUtility.parseValue(DataUtility.java:31)
connect_1            |  at com.amazon.kinesis.kafka.AmazonKinesisSinkTask.addUserRecord(AmazonKinesisSinkTask.java:224)
connect_1            |  at com.amazon.kinesis.kafka.AmazonKinesisSinkTask.put(AmazonKinesisSinkTask.java:138)
connect_1            |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
connect_1            |  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
connect_1            |  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
connect_1            |  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
connect_1            |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
connect_1            |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
connect_1            |  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1            |  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1            |  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1            |  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1            |  at java.lang.Thread.run(Thread.java:745)

Connector config:

{
    "name": "KinesisSinkConnector",
    "config": {
        "topics": "foo",
        "connector.class": "com.amazon.kinesis.kafka.AmazonKinesisSinkConnector",
        "region": "eu-central-1",
        "streamName": "bar",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}

I tried with schemas enabled but it fails anyways, any ideas what could be wrong. My mongoDB sinkconnector works fine.

desavera commented 3 years ago

what was the solution to this problem , please ?

francoisneron commented 2 years ago

Would like to know as well, please?

bdesert commented 2 years ago

@francoisneron i think it was a dup to #56 (actually, 56 was a dup to this, because was identified before, but I went bottom up). Anyway, it was failing on schema type, after adding null-check on value, most cases where resolved. let me know if you have different experience after applying the fix #57