confluentinc / kafka-connect-elasticsearch

Kafka Connect Elasticsearch connector
Other
15 stars 435 forks source link

Connect-elasticsearch-sink throwing serialization error #149

Closed naveengauba closed 6 years ago

naveengauba commented 6 years ago

I am using this connector to dump data from Kafka to Elastic Search. The data in kafka topic looks like this.

{"timeStamp":1510957620000,"edgeId":"3d0847a2-c9b9-4930-92e4-e573b9180047","programId":"firstPresentation","subNet":null,"totalPlayer":1,"playingCount":0,"pausedCount":1,"bufferingCount":0,"errorCount":0,"pfeName":"vne-linux-10.1.111.222","pfeAddress":"10.1.111.222","isvisibleInAll":false,"bitrateCntMap":{"BR_UNDER500K":1},"live":true}

The connect task throws this exception on starting kafka-connect.

[2017-11-17 23:19:30,607] ERROR Task elasticsearch-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
kafka-connect          | org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
kafka-connect          |    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
kafka-connect          |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
kafka-connect          |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
kafka-connect          |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
kafka-connect          |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
kafka-connect          |    at java.lang.Thread.run(Thread.java:745)
kafka-connect          | Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('d' (code 100)): was expecting comma to separate ARRAY entries
kafka-connect          |  at [Source: [B@3abcd42; line: 1, column: 4]
kafka-connect          | Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('d' (code 100)): was expecting comma to separate ARRAY entries
kafka-connect          |  at [Source: [B@3abcd42; line: 1, column: 4]
kafka-connect          |    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
kafka-connect          |    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
kafka-connect          |    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:447)
kafka-connect          |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:687)
kafka-connect          |    at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeArray(JsonNodeDeserializer.java:261)
kafka-connect          |    at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:64)
kafka-connect          |    at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:14)
kafka-connect          |    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3564)
kafka-connect          |    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161)
kafka-connect          |    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
kafka-connect          |    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
kafka-connect          |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
kafka-connect          |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
kafka-connect          |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
kafka-connect          |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
kafka-connect          |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
kafka-connect          |    at java.lang.Thread.run(Thread.java:745)
kafka-connect          | [2017-11-17 23:19:30,615] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)

I have also attached the broker.properties as well as connector properties. Please advise.

synhershko commented 6 years ago

This is because you are using an Avro schema when writing to Kafka, so Connect reads binary data and not JSON. You should change your Elasticsearch Sink configurations to be like:


value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
rhauch commented 6 years ago

@naveengauba, does this solve the problem you were having? If so, can we close this?

rmoff commented 6 years ago

Duplicate of #153

rmoff commented 6 years ago

Closing as resolved in #153