elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
14.17k stars 3.49k forks source link

logstash-kafka-input IllegalStateException #4151

Closed EamonZhang closed 8 years ago

EamonZhang commented 8 years ago

Hi,

Exception Kafka::Consumer caught exception: Java::JavaLang::IllegalStateException Iterator is in failed state appear, sometimes can work again and still throw this exception

When I run logstash kakfa consumer with config input { kafka { zk_connect => "***:2181" group_id => "grouplog" topic_id => "topiclog" codec => json consumer_threads => 3 rebalance_max_retries => 8 queue_size => 512 }

version : logstash-2.0.0

suyograo commented 8 years ago

Hi @EamonZhang Can yo provide the full stack trace? Also starting LS with --debug will help. Please attach the output here.

EamonZhang commented 8 years ago

Thank you @suyograo,

log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: updated fetch offset of (topiclog1:1: fetched offset = 2557895: consumed offset = 2557892) to 2557895 log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: reset consume offset of topiclog1:1: fetched offset = 2557895: consumed offset = 2557893 to 2557893 log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: reset consume offset of topiclog1:2: fetched offset = 2557891: consumed offset = 2557892 to 2557892 log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: reset consume offset of topiclog1:2: fetched offset = 2557891: consumed offset = 2557893 to 2557893 log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: reset consume offset of topiclog1:2: fetched offset = 2557891: consumed offset = 2557894 to 2557894 log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: updated fetch offset of (topiclog1:2: fetched offset = 2557894: consumed offset = 2557894) to 2557894 log4j, [2015-11-09T14:20:33.705] DEBUG: kafka.consumer.PartitionTopicInfo: reset consume offset of topiclog1:0: fetched offset = 2557897: consumed offset = 2557898 to 2557898 log4j, [2015-11-09T14:20:33.705] DEBUG: kafka.consumer.PartitionTopicInfo: updated fetch offset of (topiclog1:0: fetched offset = 2557898: consumed offset = 2557898) to 2557898 Kafka::Consumer caught exception: Java::KafkaMessage::InvalidMessageException Message is corrupt (stored crc = 1177125894, computed crc = 2177451258) Kafka::Consumer caught exception: Java::JavaLang::IllegalStateException Iterator is in failed state Kafka::Consumer caught exception: Java::JavaLang::IllegalStateException Iterator is in failed state

EamonZhang commented 8 years ago

Hi @suyograo Is this issue noticed ,it appears sometimes . I'm troubled. Thanks !

EamonZhang commented 8 years ago

Is there any way to skip in this error case? can the high level consumer provide a flag to skip corrupted message in its iterator? or logstash support by config!

suyograo commented 8 years ago

@EamonZhang you can configure auto_offset_reset to largest to skip to the later messages.

See

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-auto_offset_reset

EamonZhang commented 8 years ago

@suyograo It works now, Thanks!

arpanshah29 commented 8 years ago

I've run into the same issue, it seems like a corrupt message shouldn't perpetually send it into this state

IngaFeick commented 7 years ago

Hi, running into the same issue. What i find most problematic is that we do not see these errors in the logs but only on stdout after restarting a logstash. So if you don't happen to look at the shell while this happens, you'll never know that it does. Logstash version is 2.3.1 and kafka input is 2.0.6.