kafka-dev / kafka

A distributed publish/subscribe messaging service
http://sna-projects.com/kafka
Apache License 2.0
549 stars 300 forks source link

Getting InvalidRecordException while using kafka #55

Open brishi19791 opened 7 years ago

brishi19791 commented 7 years ago

In my application, I get data which gets queued using kafka and saved on the disk and the consumer which gets this data from kafka and does the processing. But When my consumer is trying to read data from kafka I am getting below exceptions :

2017-06-09 10:57:24,733 ERROR NetworkClient Uncaught error in request completion: org.apache.kafka.common.KafkaException: Error deserializing key/value for partition TcpMessage-1 at offset 155884487 at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:628) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:566) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) [kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) [kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) [kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) [kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) [kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) [kafka-clients-0.9.0.1.jar:?] at com.affirmed.mediation.edr.kafka.tcpMessage.TcpMessageConsumer.doWork(TcpMessageConsumer.java:190) [EdrServer.jar:?] at com.affirmed.mediation.edr.kafka.tcpMessage.TcpMessageConsumer.run(TcpMessageConsumer.java:248) [EdrServer.jar:?] ## Caused by: org.apache.kafka.common.record.InvalidRecordException: Record is corrupt (stored crc = 2016852547, computed crc = 1399853379) at org.apache.kafka.common.record.Record.ensureValid(Record.java:226) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:617) ~[kafka-clients-0.9.0.1.jar:?] ... 15 more

Could anyone please help me with this. I got stuck with it and not able to figure out the root.

When this occurs is there any way to catch this exception and move the offset? Currently, consumer is keep polling for the same range of records in the next poll as
result never moving forward.