Closed Can007 closed 10 years ago
Can you provide a way to recreate this? Searching the error on google indicates that the iterator is being used before hasNext() is ready.
Thanks for your reply.I just opened six logstash to consumption in kafka, there are two can't work and throw this exception,but they sometimes can work again and still throw this exception.What's the problem with the following configuration? input { kafka{ zk_connect => "**" topic_id => "lb" queue_size => 20 consumer_threads => 4 consumer_timeout_ms => 300
}
}
How many partitions are on the topic? Most likely you have more consumer threads than partitions so some cannot consume anything. On Nov 10, 2014 12:40 AM, "Can007" notifications@github.com wrote:
Thanks for your reply.I just opened six logstash to consumption in kafka, there are two can't work and throw this exception,but they sometimes can work again and still throw this exception.What's the problem with the following configuration? input { kafka{ zk_connect => "**" topic_id => "lb" queue_size => 20 consumer_threads => 4 consumer_timeout_ms => 300
fetch_message_max_btes => 16777216
} }
— Reply to this email directly or view it on GitHub https://github.com/joekiller/logstash-kafka/issues/44#issuecomment-62343784 .
we have 40 partitions,and just have 24 consumer threads,I've found some partitions last seen is a few hours ago.
There is no guarantee that a thread will cycle through partitions in a linear order. It is only guaranteed that the partition consumption will follow the offset which was previously recorded in Zookeeper. I recommend that you only run many consumer threads as you have processors available. Try to match your partition numbers to the maximum number of consumer threads you expect to run as well. As for the iterator error, I would search the Kafka mailing list more to try to determine if others have experienced this problem. On Nov 10, 2014 9:09 PM, "Can007" notifications@github.com wrote:
we have 40 partitions,and just have 24 consumer threads,I've found some partitions last seen is a few hours ago.
— Reply to this email directly or view it on GitHub https://github.com/joekiller/logstash-kafka/issues/44#issuecomment-62491051 .
Thanks for your advice.I will try it.
Also I would comment out consumer_timeout_ms (thus making it -1) which should reduce the exceptions you get. They may be crashing X of N consumer threads which would explain the inability for some to function. fetch_wait_max_ms will keep the threads cycling through partitions. Consumer timeout is more of a crash and burn setting but I put in consumer_restart_on_error in to at least try to recover from these issues.
I found that the data of each partition is imbalanced.So some consumer threads will wait for the data,and sometimes it never work again.Do you have any suggestions?Thanks a lot.
What is your config now?
input { kafka{ zk_connect => "*****" topic_id => "lb" queue_size => 2000 consumer_threads => 4 rebalance_max_retries => 8
}
} Now,the data of partition is balanced. But i have another question.No matter how I change the queue size,it doesn't play a role in my project.Why???
Queue sizes are only applied at creation. You should check out their Kafka documents as it explains a number of these nuances well. On Nov 11, 2014 6:05 AM, "Can007" notifications@github.com wrote:
input { kafka{ zk_connect => "*****" topic_id => "lb" queue_size => 2000 consumer_threads => 4 rebalance_max_retries => 8
fetch_message_max_bytes => 2097152
} } Now,the data of partition is balanced. But i have another question.No matter how I change the queue size,it doesn't play a role in my project.Why???
— Reply to this email directly or view it on GitHub https://github.com/joekiller/logstash-kafka/issues/44#issuecomment-62532548 .
It‘s fine now.Thanks a lot.
For those who have run into the same error: In our case what helped was deleting the ZK data for the logstash kafka consumer (by default a ZK path like /consumers/<topic_id>
).
I saw the same issue, but oddly I had a completely different root cause. Before the enormous deluge of repeated IllegalStateExceptions
, there was this buried:
java.io.FileNotFoundException: /root/snappy-unknown-e20ddfaa-bcae-4e5f-bd5c-ba58a8bd07e3-libsnappyjava.so (Permission denied)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(Unknown Source)
at java.io.FileOutputStream.<init>(Unknown Source)
at java.io.FileOutputStream.<init>(Unknown Source)
at org.xerial.snappy.SnappyLoader.extractLibraryFile(SnappyLoader.java:212)
at org.xerial.snappy.SnappyLoader.findNativeLibrary(SnappyLoader.java:307)
at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:163)
at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:145)
at org.xerial.snappy.Snappy.<clinit>(Snappy.java:47)
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:358)
at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
at java.io.InputStream.read(Unknown Source)
at kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:67)
at kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
at kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
at scala.collection.immutable.Stream$.continually(Stream.scala:1129)
at kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:67)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
[...]
at Kafka$$Consumer_1733551146.run(Kafka$$Consumer_1733551146.gen:13)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
This was cause by running logstash as sudo -u logstash /bin/logstash ...
from the root account.
So it looks like that there may still be an issue whereby if the KafkaConsumer runs into a problem for whatever reason, it triggers this behaviour in the plugin, rather than dying cleanly.
Hi, @joekiller I saw same issue recently .
Kafka 2.10-0.8.2.2 logstash-2.0.0 input { kafka { zk_connect => "***:2181" group_id => "grouplog" topic_id => "topiclog" codec => json consumer_threads => 3 rebalance_max_retries => 8 queue_size => 2048 } }
partions in kafka is 3
start --debug log file
log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: updated fetch offset of (topiclog1:1: fetched offset = 2557895: consumed offset = 2557892) to 2557895 log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: reset consume offset of topiclog1:1: fetched offset = 2557895: consumed offset = 2557893 to 2557893 log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: reset consume offset of topiclog1:2: fetched offset = 2557891: consumed offset = 2557892 to 2557892 log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: reset consume offset of topiclog1:2: fetched offset = 2557891: consumed offset = 2557893 to 2557893 log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: reset consume offset of topiclog1:2: fetched offset = 2557891: consumed offset = 2557894 to 2557894 log4j, [2015-11-09T14:20:33.703] DEBUG: kafka.consumer.PartitionTopicInfo: updated fetch offset of (topiclog1:2: fetched offset = 2557894: consumed offset = 2557894) to 2557894 log4j, [2015-11-09T14:20:33.705] DEBUG: kafka.consumer.PartitionTopicInfo: reset consume offset of topiclog1:0: fetched offset = 2557897: consumed offset = 2557898 to 2557898 log4j, [2015-11-09T14:20:33.705] DEBUG: kafka.consumer.PartitionTopicInfo: updated fetch offset of (topiclog1:0: fetched offset = 2557898: consumed offset = 2557898) to 2557898 Kafka::Consumer caught exception: Java::KafkaMessage::InvalidMessageException Message is corrupt (stored crc = 1177125894, computed crc = 2177451258) Kafka::Consumer caught exception: Java::JavaLang::IllegalStateException Iterator is in failed state Kafka::Consumer caught exception: Java::JavaLang::IllegalStateException Iterator is in failed state
Best Regards ! Eamon
+1 I'm looking at the same issue right now with logstash version 2.4.0.
Hello,I get following exception when using logstash-kafka. What could cause the iterator to have FAILED state. Please let me know how I can fix this issue.thanks in advance. Kafka::Consumer caught exception: Java::JavaLang::IllegalStateException Iterator is in failed state