micronaut-projects / micronaut-kafka

Integration between Micronaut and Apache Kafka
Apache License 2.0
86 stars 107 forks source link

JsonParseException when consuming message from kafka topic #1003

Closed stanleyw2014 closed 7 months ago

stanleyw2014 commented 7 months ago

Issue description

Hi, I got below exception when consuming message from a kafka topic:

org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition engage-message-engagement-0 at offset 19. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:309) at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) at io.micronaut.configuration.kafka.processor.ConsumerStateSingle.pollRecords(ConsumerStateSingle.java:58) at io.micronaut.configuration.kafka.processor.ConsumerState.pollAndProcessRecords(ConsumerState.java:197) at io.micronaut.configuration.kafka.processor.ConsumerState.refreshAssignmentsPollAndProcessRecords(ConsumerState.java:164) at io.micronaut.configuration.kafka.processor.ConsumerState.threadPollLoop(ConsumerState.java:154) at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:193) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: io.micronaut.core.serialize.exceptions.SerializationException: Error deserializing object from JSON: Unexpected character (':' (code 58)): Expected space separating root-level values at [Source: REDACTED (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION disabled); line: 1, column: 4] at io.micronaut.json.JsonObjectSerializer.deserialize(JsonObjectSerializer.java:70) at io.micronaut.configuration.kafka.serde.JsonObjectSerde.deserialize(JsonObjectSerde.java:59) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:62) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) ... 18 common frames omitted Caused by: io.micronaut.json.JsonSyntaxException: Unexpected character (':' (code 58)): Expected space separating root-level values at [Source: REDACTED (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION disabled); line: 1, column: 4] at io.micronaut.serde.jackson.JacksonJsonMapper.readValue(JacksonJsonMapper.java:232) at io.micronaut.json.JsonObjectSerializer.deserialize(JsonObjectSerializer.java:68) ... 22 common frames omitted Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): Expected space separating root-level values at [Source: REDACTED (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION disabled); line: 1, column: 4] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2481) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:752) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:676) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:724) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1785) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseUnsignedNumber(UTF8StreamJsonParser.java:1514) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:909) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:797) at io.micronaut.serde.jackson.JacksonDecoder.(JacksonDecoder.java:70) at io.micronaut.serde.jackson.JacksonDecoder.create(JacksonDecoder.java:80) at io.micronaut.serde.jackson.JacksonJsonMapper.readValue0(JacksonJsonMapper.java:191) at io.micronaut.serde.jackson.JacksonJsonMapper.readValue(JacksonJsonMapper.java:184) at io.micronaut.serde.jackson.JacksonJsonMapper.readValue(JacksonJsonMapper.java:230) ... 23 common frames omitted

my producer: @KafkaClient public interface OrderProducer { @Topic("order") void produceOrder(String orderKey, Order order); } my consuemer: @Topic("order") public void consumeOrder(@KafkaKey String orderKey, Order order) { ..... }

the Order pojo:

@Serdeable.Serializable @Serdeable.Deserializable public class Order { private long orderId; private long connectorInstanceId; private String connectorSessionId; private String connectorInstancePool; private String authenticationToken;

/**

I'm wonrdering if there is anyway to print out the JSON string that the consumer trying to consume? if I can see the json string, maybe I can know where the problem is. I even tried to produce empty Order object into the topic, it still throws above Exception.