logstash-plugins / logstash-input-kafka

Kafka input for Logstash
Apache License 2.0
139 stars 122 forks source link

Setting session_timeout_ms causes plugin to fail with unrecoverable error #89

Open ppf2 opened 8 years ago

ppf2 commented 8 years ago

Running LS 2.3.2 with kafka input plugin 3.0.2.

When attempting to set a session_timeout_ms to the input configuration:

input{
kafka{
topics => ["test"]
session_timeout_ms => "60000"
}
}

Logstash fails with the error:

Starting pipeline {:id=>"main", :pipeline_workers=>4, :batch_size=>125, :batch_delay=>5, :max_inflight=>500, :level=>:info, :file=>"logstash/pipeline.rb", :line=>"188", :method=>"start_workers"}
Pipeline main started {:file=>"logstash/agent.rb", :line=>"465", :method=>"start_pipeline"}
Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :level=>:error, :file=>"logstash/inputs/kafka.rb", :line=>"213", :method=>"create_consumer"}
A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Kafka topics=>["test"], session_timeout_ms=>"60000", codec=><LogStash::Codecs::Plain charset=>"UTF-8">, auto_commit_interval_ms=>"10", bootstrap_servers=>"localhost:9092", client_id=>"logstash", consumer_threads=>1, enable_auto_commit=>"true", group_id=>"logstash", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl=>false>
  Error: uncaught throw Failed to construct kafka consumer in thread 0x2ffc
  Exception: ArgumentError
  Stack: org/jruby/RubyKernel.java:1283:in `throw'
/Users/User_Name/ELK/ElasticStack_2_0/logstash-2.3.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-3.0.2/lib/logstash/inputs/kafka.rb:214:in `create_consumer'
/Users/User_Name/ELK/ElasticStack_2_0/logstash-2.3.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-3.0.2/lib/logstash/inputs/kafka.rb:142:in `run'
org/jruby/RubyFixnum.java:275:in `times'
org/jruby/RubyEnumerator.java:274:in `each'
org/jruby/RubyEnumerable.java:757:in `map'
/Users/User_Name/ELK/ElasticStack_2_0/logstash-2.3.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-3.0.2/lib/logstash/inputs/kafka.rb:142:in `run'
/Users/User_Name/ELK/ElasticStack_2_0/logstash-2.3.2/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:342:in `inputworker'
/Users/User_Name/ELK/ElasticStack_2_0/logstash-2.3.2/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:336:in `start_input' {:level=>:error, :file=>"logstash/pipeline.rb", :line=>"353", :method=>"inputworker"}
Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :level=>:error, :file=>"logstash/inputs/kafka.rb", :line=>"213", :method=>"create_consumer"}
A plugin had an unrecoverable error. Will restart this plugin.

Also tried setting it to a number instead of a string (just in case the documentation is incorrect), doesn't work either:

Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.config.ConfigException: Invalid value 60000 for configuration session.timeout.ms: Expected value to be an number., :level=>:error, :file=>"logstash/inputs/kafka.rb", :line=>"213", :method=>"create_consumer"}
A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Kafka topics=>["test"], session_timeout_ms=>60000, codec=><LogStash::Codecs::Plain charset=>"UTF-8">, auto_commit_interval_ms=>"10", bootstrap_servers=>"localhost:9092", client_id=>"logstash", consumer_threads=>1, enable_auto_commit=>"true", group_id=>"logstash", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl=>false>
  Error: uncaught throw Invalid value 60000 for configuration session.timeout.ms: Expected value to be an number. in thread 0x2ffc
  Exception: ArgumentError
  Stack: org/jruby/RubyKernel.java:1283:in `throw'
/Users/User_Name/ELK/ElasticStack_2_0/logstash-2.3.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-3.0.2/lib/logstash/inputs/kafka.rb:214:in `create_consumer'
/Users/User_Name/ELK/ElasticStack_2_0/logstash-2.3.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-3.0.2/lib/logstash/inputs/kafka.rb:142:in `run'
org/jruby/RubyFixnum.java:275:in `times'
org/jruby/RubyEnumerator.java:274:in `each'
org/jruby/RubyEnumerable.java:757:in `map'
/Users/User_Name/ELK/ElasticStack_2_0/logstash-2.3.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-3.0.2/lib/logstash/inputs/kafka.rb:142:in `run'
/Users/User_Name/ELK/ElasticStack_2_0/logstash-2.3.2/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:342:in `inputworker'
/Users/User_Name/ELK/ElasticStack_2_0/logstash-2.3.2/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:336:in `start_input' {:level=>:error, :file=>"logstash/pipeline.rb", :line=>"353", :method=>"inputworker"}
Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.config.ConfigException: Invalid value 60000 for configuration session.timeout.ms: Expected value to be an number., :level=>:error, :file=>"logstash/inputs/kafka.rb", :line=>"213", :method=>"create_consumer"}
ppf2 commented 8 years ago

I wrote a direct Kafka Java API client and reproduced this outside of Logstash:

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:648)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:542)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:524)
    at test.main(test.java:29)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.kafka.common.config.ConfigException: request.timeout.ms should be greater than session.timeout.ms and fetch.max.wait.ms
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
    ... 8 more

The Kafka client exception returned from the Kafka client is very clear on what the problem is here, except that we are not reporting the full exception stack at all (even at --debug) and throwing back something that is generic and not intuitive.

Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :level=>:error, :file=>"logstash/inputs/kafka.rb", :line=>"213", :method=>"create_consumer"}
A plugin had an unrecoverable error. Will restart this plugin.

We will need to improve our error handling and report the full exception stack.