logstash-plugins / logstash-input-kafka

Kafka input for Logstash
Apache License 2.0
139 stars 122 forks source link

Kafka logstash input do not continue from where it left off on restart #68

Closed robin-anil closed 8 years ago

robin-anil commented 8 years ago

This is highly undesirable, the reason we are publishing data to kafka is to ensure the consumers can be taken down and can come back up in an asynchronous fashion. Is this a limitation of this plugin? or simply a configuration issue.

Also where does the kafka input save the kafka position per partition ?

I am using the latest version of the plugin with the following kafka config

input {
    kafka {
        zk_connect => "zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181"
        topic_id => "logs"
        consumer_threads => 1
        consumer_restart_on_error => true
        consumer_restart_sleep_ms => 100
        decorate_events => true
    type => "logs"
    }
}
suyograo commented 8 years ago

@r-tock by default kafka stores its offset metadata in ZK. We use the high level consumer described here, which automatically stores offsets every 60s.

Do you have a reproducible case?

suyograo commented 8 years ago

Also, was this a hard restart, like kill -9 ?

robin-anil commented 8 years ago

this was the default restart i.e. sudo service logstash restart Maybe the shutdown bug is affecting saving off offsets ? https://github.com/logstash-plugins/logstash-input-kafka/issues/49

robin-anil commented 8 years ago

I am able to reproduce this any time. just stop logstash and start again, all the logs in between are lost.

joekiller commented 8 years ago

Can you please check your consumer offsets?

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker

Also is this the only Logstash/Kafka consumer you are using with this topic?

joekiller commented 8 years ago

This discussion is probably better for the logstash discussion group vs an issue here as issues generally have steps to reproduce a problem and I'm unsure if this is really a bug.

robin-anil commented 8 years ago

I have currently a case where the logstash kafka consumer is lagging behind. Now this happend twice this week, and everytime I restarted logstash kafka input the plugin continued from the latest point losing all the logs. Since I have the problem in my grip at the moment, I would like the folks here to tell me how I can check the logstash (without restarting) to help you debug what is going on.

@joekiller @suyograo

screen shot 2016-02-07 at 10 04 03 pm
robin-anil commented 8 years ago

Ping. I would like to restart the logstash instance. But I am waiting from the devs to tell me how to debug this, we are losing logs because of this bug.

talevy commented 8 years ago

can you check that zookeeper has offsets written for your group id? by default this group is called logstash. and is the original config your config that you are using now? are you setting reset_beginning?

suyograo commented 8 years ago

@r-tock are the LSs with 0 lag experiencing the loss of messages? Also, can you check your ZK -- you'd have to use zkCli.sh and navigate to the consumer group and partition -- and see if the offsets are written

suyograo commented 8 years ago

@r-tock Also if you can paste some of the LS logs when you restarted, we'd like to look

robin-anil commented 8 years ago

@talevy The picture above are offsets from zookeeper for group logstash as viewed on kafka-manager

robin-anil commented 8 years ago

My logstash configuration is noted above in the very first post.

robin-anil commented 8 years ago

This is the logstash log from the last restart 4 days ago. If I restart know that logstash will continue from current position, that happened last two times. But it will lose all the logs in the process. At some point in the life-time of logstash, some of the partition readers seem wedged. Logs below don't show any issue.

{:timestamp=>"2016-02-03T23:03:10.959000+0000", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn}
{:timestamp=>"2016-02-03T23:03:15.997000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"demo_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"demo_logs"}]=>[{"thread_id"=>16, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"demo_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"demo_ex_logs"}]=>[{"thread_id"=>17, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"local_exchange_logs", "reset_beginning"=>"true", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"local_ex_logs"}]=>[{"thread_id"=>18, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_logs"}]=>[{"thread_id"=>19, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_ex_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>22, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>24, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-03T23:03:16.010000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-02-03T23:03:20.973000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"demo_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"demo_logs"}]=>[{"thread_id"=>16, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"demo_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"demo_ex_logs"}]=>[{"thread_id"=>17, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"local_exchange_logs", "reset_beginning"=>"true", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"local_ex_logs"}]=>[{"thread_id"=>18, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_logs"}]=>[{"thread_id"=>19, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_ex_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>22, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>24, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-03T23:03:25.982000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"demo_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"demo_logs"}]=>[{"thread_id"=>16, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"demo_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"demo_ex_logs"}]=>[{"thread_id"=>17, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"local_exchange_logs", "reset_beginning"=>"true", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"local_ex_logs"}]=>[{"thread_id"=>18, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_logs"}]=>[{"thread_id"=>19, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_ex_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>22, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>24, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-03T23:03:30.982000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"demo_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"demo_logs"}]=>[{"thread_id"=>16, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"demo_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"demo_ex_logs"}]=>[{"thread_id"=>17, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"local_exchange_logs", "reset_beginning"=>"true", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"local_ex_logs"}]=>[{"thread_id"=>18, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_logs"}]=>[{"thread_id"=>19, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_ex_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>22, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.3/lib/logstash/inputs/kafka.rb:139:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>24, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-03T23:03:32.621000+0000", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.335000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628335-cc55ed4a can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.334000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628334-80251e75 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.359000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628410-23644bc2 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.380000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628336-ed1539b2 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.382000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628330-fdd175c0 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.392000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628336-f50f699f can't rebalance after 4 retries, :level=>:warn}
robin-anil commented 8 years ago

This is the current state of all our consumers for group logstash. We have only one logstash consumer instance btw and all of them are reading from 20 partitions over about 10ish topics

screen shot 2016-02-08 at 5 30 46 pm
suyograo commented 8 years ago

@r-tock can you remove consumer_restart_on_error. Not sure if this is related to the issue, want to eliminate the possibility

suyograo commented 8 years ago

I say this because of logs:

{:timestamp=>"2016-02-03T23:03:58.335000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628335-cc55ed4a can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.334000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628334-80251e75 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.359000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628410-23644bc2 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.380000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628336-ed1539b2 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.382000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628330-fdd175c0 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-03T23:03:58.392000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1454540628336-f50f699f can't rebalance after 4 retries, :level=>:warn}
robin-anil commented 8 years ago

Ok. Do you want me to restart the instance now after removing consumer_restart_on_error, is there anything else I can give help on before the instance is restarted. The problem usually do not surface for few days so I want to take advantage while I have it on a tight leash

robin-anil commented 8 years ago

Please confirm you want me to restart logstash with

input {
    kafka {
        zk_connect => "zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181"
        topic_id => "logs"
        consumer_threads => 1
        consumer_restart_sleep_ms => 100
        decorate_events => true
    type => "logs"
    }
}
suyograo commented 8 years ago

@r-tock hang on, debugging

talevy commented 8 years ago

regarding the unclean shutdown logs that you are seeing, we have fixed this in the latest version of the plugin, and you should not have to force shutdown as your are anymore. the pipeline shouldn't stall.

bin/plugin update logstash-input-kafka to version 2.0.4

robin-anil commented 8 years ago

@talevy I tried that before 2 weeks ago I believe I got a ruby gem install error. So I rolled it back to 2.0.3. In hindsight I should have reported that, I just didn't have the time to follow up on that.

suyograo commented 8 years ago

@r-tock Can you try bin/plugin install --version 2.0.4 logstash-input-kafka

robin-anil commented 8 years ago

Here is what I get

Validating logstash-input-kafka-2.0.4
Installing logstash-input-kafka
Plugin version conflict, aborting
ERROR: Installation Aborted, message: Bundler could not find compatible versions for gem "jruby-kafka":
  In snapshot (Gemfile.lock):
    jruby-kafka (= 1.4.0)

  In Gemfile:
    logstash-input-kafka (= 2.0.4) java depends on
      jruby-kafka (= 1.5.0) java

Running `bundle update` will rebuild your snapshot from scratch, using only
the gems in your Gemfile, which may resolve the conflict.
robin-anil commented 8 years ago

It seems I cannot bundle update because this is within the binary

suyograo commented 8 years ago

@r-tock can you live debug with me and @talevy ?

robin-anil commented 8 years ago

https://gist.github.com/r-tock/f97b695d21574b4a69cf Please find the jstack output for the process in the gist above

robin-anil commented 8 years ago

@suyograo @talevy Did you guys make any progress? Was the stack trace helpful. Let me know if there is anything else I can do before I upgrade and restart the instance with consumer_restart_on_error removed.

robin-anil commented 8 years ago

Alright, at this point I am going to restart the server with upgrade and removing the consumer_restart_on_error

talevy commented 8 years ago

I can't see anything specific to the issue via the jstack. that sounds good.

robin-anil commented 8 years ago

post upgrade the restart is working better logstash does not stall as long but it still does.

However it is dropping events post a restart See the picture. The first big empty block is the upgrade and restart and the second one is a restart with the new instance.

screen shot 2016-02-10 at 12 17 55 pm

Logs are warning about stalling threads and still have consumer rebalance exception. I also issued a partition reassignment request on kafka yesterday, so I cannot see anything weird on the kafka side.

Also note that the rebalance exception is for consumers that no longer exist. When I look at the consumer info on kafka, these instance owner ids don't match what is running currently.

{:timestamp=>"2016-02-10T18:16:02.928000+0000", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn}
{:timestamp=>"2016-02-10T18:16:07.953000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_logs"}]=>[{"thread_id"=>18, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "decorate_events"=>"true", "type"=>"prod_ex_logs"}]=>[{"thread_id"=>19, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-10T18:16:07.958000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-02-10T18:16:12.934000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-10T18:16:17.935000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-10T18:16:17.937000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-02-10T18:16:22.934000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-10T18:16:27.936000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"input_to_filter"=>1, "total"=>1}, "STALLING_THREADS"=>{"other"=>[{"thread_id"=>80, "name"=>">output", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.4.1-java/lib/logstash/outputs/elasticsearch/buffer.rb:51:in `join'"}]}}
{:timestamp=>"2016-02-10T18:17:30.465000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240517-d3980226 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-10T18:17:30.488000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240515-62c925a7 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-10T18:17:30.498000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240517-a4fec7e7 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-10T18:17:30.501000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240517-183843e1 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-10T18:17:30.601000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240598-6c90460f can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-10T18:17:30.603000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240492-69a5fb05 can't rebalance after 4 retries, :level=>:warn}
joekiller commented 8 years ago

Depending on the number of partitions you have and where the data resides in the Kafka queue you may see different log times appear in elasticsearch has the processes catch up. Check your group offsets. On Feb 10, 2016 1:22 PM, "Robin Anil" notifications@github.com wrote:

post upgrade the restart is working better logstash does not stall as long but it still does.

However it is dropping events post a restart See the picture. The first big empty block is the upgrade and restart and the second one is a restart with the new instance. [image: screen shot 2016-02-10 at 12 17 55 pm] https://cloud.githubusercontent.com/assets/11711723/12957019/6fa65344-cff0-11e5-9abb-17c8d454cd15.png

Logs are warning about stalling threads and still have consumer rebalance exception. I also issued a partition reassignment request on kafka yesterday, so I cannot see anything weird on the kafka side.

Also note that the rebalance exception is for consumers that no longer exist. When I look at the consumer info on kafka, these instance owner ids don't match what is running currently.

{:timestamp=>"2016-02-10T18:16:02.928000+0000", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn} {:timestamp=>"2016-02-10T18:16:07.953000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_logs"}]=>[{"thread_id"=>18, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "decorate_events"=>"true", "type"=>"prod_ex_logs"}]=>[{"thread_id"=>19, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in pop'"}] , ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:inpop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<sys log", "c urrent_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in join'"}]}} {:timestamp=>"2016-02-10T18:16:07.958000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error} {:timestamp=>"2016-02-10T18:16:12.934000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:inpop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.r b:144:in pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:injoin'"}]}} {:timestamp=>"2016-02-10T18:16:17.935000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.r b:144:in pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in join'"}]}} {:timestamp=>"2016-02-10T18:16:17.937000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error} {:timestamp=>"2016-02-10T18:16:22.934000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:injoin'"}]}} {:timestamp=>"2016-02-10T18:16:27.936000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"input_to_filter"=>1, "total"=>1}, "STALLING_THREADS"=>{"other"=>[{"thread_id"=>80, "name"=>">output", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.4.1-java/lib/logstash/outputs/elasticsearch/buffer.rb:51:in `join'"}]}} {:timestamp=>"2016-02-10T18:17:30.465000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240517-d3980226 can't rebalance after 4 retries, :level=>:warn} {:timestamp=>"2016-02-10T18:17:30.488000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240515-62c925a7 can't rebalance after 4 retries, :level=>:warn} {:timestamp=>"2016-02-10T18:17:30.498000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240517-a4fec7e7 can't rebalance after 4 retries, :level=>:warn} {:timestamp=>"2016-02-10T18:17:30.501000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240517-183843e1 can't rebalance after 4 retries, :level=>:warn} {:timestamp=>"2016-02-10T18:17:30.601000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240598-6c90460f can't rebalance after 4 retries, :level=>:warn} {:timestamp=>"2016-02-10T18:17:30.603000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455128240492-69a5fb05 can't rebalance after 4 retries, :level=>:warn}

— Reply to this email directly or view it on GitHub https://github.com/logstash-plugins/logstash-input-kafka/issues/68#issuecomment-182514513 .

robin-anil commented 8 years ago

Log times are actually in the json payload. So when a catch up happens we will create insertions with old timestamp, I have done that to ensure I can re-index and re-process any time. I can assure you that this used to work previously while I had logstash 1.5

filter {
  date {
    match => [ "timestampMs", "UNIX_MS" ]
  }
}
joekiller commented 8 years ago

Looking at your consumer offsets from above, it looks like you have two different logstash-input-kafka inputs running on the same group.

0f3 appears to be fine, it has no offset lag. 9b2 is the non-working one. Now to explain your gap, or non-resumption problem, it appears that you have two independent consumers running on the same system (check your processes) each runs with the default id of logstash. One of the processes is consuming and may be doing so without you realizing it thus progressing the last offset so when you stop and start it, it appears like stuff is missing. the stuck process has the logs in their lag, so once you kill it, you should see the rest of the logs. The other was working on it's half so there is a chance that it just had the up to date logs in that partition. Either way, kill those other consumers and you should see the right behavior. BTW I think restart on errors is fine to use.

robin-anil commented 8 years ago

I have exactly one logstash process and when I restart it I have exactly one running.

robin-anil commented 8 years ago

The occam's razor explanation is that the logstash kafka input is opening up two inputs for the same group.

joekiller commented 8 years ago

Could you check your consumer instance owners again? If just one group is acting on the topic you should just see just one owner listed. If there are still two, you might have a hung or back ground process. Try at ps -ef | grep java to look for others.

robin-anil commented 8 years ago
# ps -Aef |grep java
logstash  5487     1  4 18:17 ?        00:05:56 /usr/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -Djava.awt.headless=true -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -Djava.io.tmpdir=/var/lib/logstash -Xmx2g -Xss2048k -Djffi.boot.library.path=/opt/logstash/vendor/jruby/lib/jni -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -Djava.awt.headless=true -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -Djava.io.tmpdir=/var/lib/logstash -XX:HeapDumpPath=/opt/logstash/heapdump.hprof -Xbootclasspath/a:/opt/logstash/vendor/jruby/lib/jruby.jar -classpath : -Djruby.home=/opt/logstash/vendor/jruby -Djruby.lib=/opt/logstash/vendor/jruby/lib -Djruby.script=jruby -Djruby.shell=/bin/sh org.jruby.Main --1.9 /opt/logstash/lib/bootstrap/environment.rb logstash/runner.rb agent -f /etc/logstash/conf.d -l /var/log/logstash/logstash.log
root      8600  4043  0 20:18 pts/0    00:00:00 grep java
joekiller commented 8 years ago

I would still check the consumer instance owners again. There isn't much else to explain what you are seeing. If there is just one owner now, you should see the lag count reducing and logs back filling.

robin-anil commented 8 years ago

Here are the instance owners for two different topic read by the same logstash

Are you expecting instance owners to be different across topics?

screen shot 2016-02-10 at 3 03 47 pm screen shot 2016-02-10 at 3 03 36 pm
joekiller commented 8 years ago

that looks better than the first one and is what I would expect. because you have two different kafka inputs for each topic, the owner thread will be named differently. You could use a whitelist filter to have one thread do them all .*_logs would catch both.

To ensure that everything is working as you expect (ie, resuming correctly), I'd shutdown logstash, check the offsets again and make sure lag is accumulating and then start logstash back up. You should see it drain the accumulated lag.

robin-anil commented 8 years ago

Alright I am going to do that now.

robin-anil commented 8 years ago

sudo service logstash stop

{:timestamp=>"2016-02-10T21:20:47.316000+0000", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn}
{:timestamp=>"2016-02-10T21:20:52.333000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"prod_logs"}]=>[{"thread_id"=>18, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"prod_exchange_logs", "consumer_threads"=>1, "consumer_restart_on_error"=>"true", "decorate_events"=>"true", "type"=>"prod_ex_logs"}]=>[{"thread_id"=>19, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-10T21:20:52.339000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-02-10T21:20:57.328000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-10T21:21:02.328000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_logs"}]=>[{"thread_id"=>20, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
{:timestamp=>"2016-02-10T21:21:02.338000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-02-10T21:21:07.326000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}
#ps -Aef |grep java
root     10282  4043  0 21:21 pts/0    00:00:00 grep java
robin-anil commented 8 years ago

Verified lag was accumulated

screen shot 2016-02-10 at 3 22 34 pm screen shot 2016-02-10 at 3 22 50 pm
robin-anil commented 8 years ago

Now when I restart the instance the lag is increasing (this is new behavior). and I can only see 5 of the topics

sudo service logstash start

ps -Aef |grep java
logstash 10431     1 65 21:25 ?        00:00:20 /usr/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -Djava.awt.headless=true -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -Djava.io.tmpdir=/var/lib/logstash -Xmx2g -Xss2048k -Djffi.boot.library.path=/opt/logstash/vendor/jruby/lib/jni -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -Djava.awt.headless=true -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -Djava.io.tmpdir=/var/lib/logstash -XX:HeapDumpPath=/opt/logstash/heapdump.hprof -Xbootclasspath/a:/opt/logstash/vendor/jruby/lib/jruby.jar -classpath : -Djruby.home=/opt/logstash/vendor/jruby -Djruby.lib=/opt/logstash/vendor/jruby/lib -Djruby.script=jruby -Djruby.shell=/bin/sh org.jruby.Main --1.9 /opt/logstash/lib/bootstrap/environment.rb logstash/runner.rb agent -f /etc/logstash/conf.d -l /var/log/logstash/logstash.log
root     10568  4043  0 21:26 pts/0    00:00:00 grep java
screen shot 2016-02-10 at 3 27 10 pm screen shot 2016-02-10 at 3 29 45 pm
robin-anil commented 8 years ago
screen shot 2016-02-10 at 3 31 04 pm
robin-anil commented 8 years ago

Now I did a

sudo service logstash restart Logstash exited but didn't come up

{:timestamp=>"2016-02-10T21:31:49.546000+0000", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn}
{:timestamp=>"2016-02-10T21:31:54.562000+0000", :level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"total"=>0}, "STALLING_THREADS"=>{["LogStash::Inputs::Kafka", {"zk_connect"=>"zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181", "topic_id"=>"staging_exchange_logs", "consumer_threads"=>1, "consumer_restart_sleep_ms"=>100, "decorate_events"=>"true", "type"=>"staging_ex_logs"}]=>[{"thread_id"=>21, "name"=>"<kafka", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:144:in `pop'"}], ["LogStash::Inputs::Syslog", {"type"=>"syslog", "port"=>10000}]=>[{"thread_id"=>22, "name"=>"<syslog", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-2.0.2/lib/logstash/inputs/syslog.rb:106:in `join'"}]}}

I ran it again sudo service logstash restart

{:timestamp=>"2016-02-10T21:32:55.478000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455139966180-37df52d4 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-10T21:32:55.481000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455139966137-3db7c519 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-10T21:32:55.485000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455139966117-f0fef61f can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-10T21:32:55.497000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455139966128-1058a8de can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-10T21:32:55.508000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455139966127-29be3184 can't rebalance after 4 retries, :level=>:warn}
{:timestamp=>"2016-02-10T21:32:55.501000+0000", :message=>"kafka client threw exception, restarting", :exception=>kafka.common.ConsumerRebalanceFailedException: logstash_logstash.c.rapid-depot-817.internal-1455139966137-ac6d391f can't rebalance after 4 retries, :level=>:warn}

Its back up but it has lost all logs

screen shot 2016-02-10 at 3 34 22 pm screen shot 2016-02-10 at 3 34 28 pm
robin-anil commented 8 years ago
screen shot 2016-02-10 at 3 35 10 pm

I guess there is some behavior issue with stop & start v/s restart and stop and start don't seem to allow the consumer to continue.

joekiller commented 8 years ago

Comparing the offsets, the logs are being consumed as you can see all the consumer offsets have increased. "Lag" even in the low 100's isn't that bad and you will generally always see some lag because it is a thread working across many partitions and you are always adding logs.

If you aren't seeing the logs in Elasticsearch in the timeline where you expect them, I'd guess the date filter isn't firing correctly. Take a close look at those logs because that big bump in logs on your timeseries sure looks like a back filling bump where all the logs are being stamped as they are hit in logstash, hence them only appearing when you run logstash.

You could try tag_on_failure => true to see if the match is failing.

That would be my guess because the consumer is sucking down the logs per what your metrics report.

robin-anil commented 8 years ago

I looked at the logs, the date matches the timestampMs field in the json, that bump is part of exchange log traffic noise so that bump is not actually the catchup.

I can assure you that all the logs in between have been lost. We have a backup on docker container and I can see the logs with those timestampss there and not in elastic search.

If there is an indexing failure the logstash logs usually have that info and I cannot see anything there.