elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
66 stars 3.5k forks source link

logstash 8.4.x - kafka input plugin error: Unable to create Kafka consumer from given configuration #14671

Open voriol opened 2 years ago

voriol commented 2 years ago

Logstash information:

Please include the following information:

  1. Logstash version: 8.4.x
  2. Logstash installation source: tar.gz
  3. How is Logstash being run: /opt/logstash/bin/logstash -f /opt/logstash/logstash.conf

Plugins installed: (bin/logstash-plugin list --verbose)

Using bundled JDK: /opt/logstash/logstash-8.4.0/jdk
logstash-codec-avro (3.4.0)
logstash-codec-cef (6.2.5)
logstash-codec-collectd (3.1.0)
logstash-codec-dots (3.0.6)
logstash-codec-edn (3.1.0)
logstash-codec-edn_lines (3.1.0)
logstash-codec-es_bulk (3.1.0)
logstash-codec-fluent (3.4.1)
logstash-codec-graphite (3.0.6)
logstash-codec-json (3.1.0)
logstash-codec-json_lines (3.1.0)
logstash-codec-line (3.1.1)
logstash-codec-msgpack (3.1.0)
logstash-codec-multiline (3.1.1)
logstash-codec-netflow (4.2.2)
logstash-codec-plain (3.1.0)
logstash-codec-rubydebug (3.1.0)
logstash-filter-aggregate (2.10.0)
logstash-filter-anonymize (3.0.6)
logstash-filter-cidr (3.1.3)
logstash-filter-clone (4.2.0)
logstash-filter-csv (3.1.1)
logstash-filter-date (3.1.15)
logstash-filter-de_dot (1.0.4)
logstash-filter-dissect (1.2.5)
logstash-filter-dns (3.1.5)
logstash-filter-drop (3.0.5)
logstash-filter-elasticsearch (3.12.0)
logstash-filter-fingerprint (3.4.1)
logstash-filter-geoip (7.2.12)
logstash-filter-grok (4.4.2)
logstash-filter-http (1.4.1)
logstash-filter-json (3.2.0)
logstash-filter-kv (4.7.0)
logstash-filter-memcached (1.1.0)
logstash-filter-metrics (4.0.7)
logstash-filter-mutate (3.5.6)
logstash-filter-prune (3.0.4)
logstash-filter-ruby (3.1.8)
logstash-filter-sleep (3.0.7)
logstash-filter-split (3.1.8)
logstash-filter-syslog_pri (3.1.1)
logstash-filter-throttle (4.0.4)
logstash-filter-translate (3.3.1)
logstash-filter-truncate (1.0.5)
logstash-filter-urldecode (3.0.6)
logstash-filter-useragent (3.3.3)
logstash-filter-uuid (3.0.5)
logstash-filter-xml (4.2.0)
logstash-input-azure_event_hubs (1.4.4)
logstash-input-beats (6.4.0)
└── logstash-input-elastic_agent (alias)
logstash-input-couchdb_changes (3.1.6)
logstash-input-dead_letter_queue (2.0.0)
logstash-input-elasticsearch (4.14.0)
logstash-input-exec (3.4.0)
logstash-input-file (4.4.3)
logstash-input-ganglia (3.1.4)
logstash-input-gelf (3.3.1)
logstash-input-generator (3.1.0)
logstash-input-graphite (3.0.6)
logstash-input-heartbeat (3.1.1)
logstash-input-http (3.6.0)
logstash-input-http_poller (5.3.1)
logstash-input-imap (3.2.0)
logstash-input-jms (3.2.2)
logstash-input-pipe (3.1.0)
logstash-input-redis (3.7.0)
logstash-input-snmp (1.3.1)
logstash-input-snmptrap (3.1.0)
logstash-input-stdin (3.4.0)
logstash-input-syslog (3.6.0)
logstash-input-tcp (6.3.0)
logstash-input-twitter (4.1.0)
logstash-input-udp (3.5.0)
logstash-input-unix (3.1.1)
logstash-integration-aws (7.0.0)
 ├── logstash-codec-cloudfront
 ├── logstash-codec-cloudtrail
 ├── logstash-input-cloudwatch
 ├── logstash-input-s3
 ├── logstash-input-sqs
 ├── logstash-output-cloudwatch
 ├── logstash-output-s3
 ├── logstash-output-sns
 └── logstash-output-sqs
logstash-integration-elastic_enterprise_search (2.2.1)
 ├── logstash-output-elastic_app_search
 └──  logstash-output-elastic_workplace_search
logstash-integration-jdbc (5.3.0)
 ├── logstash-input-jdbc
 ├── logstash-filter-jdbc_streaming
 └── logstash-filter-jdbc_static
logstash-integration-kafka (10.12.0)
 ├── logstash-input-kafka
 └── logstash-output-kafka
logstash-integration-rabbitmq (7.3.0)
 ├── logstash-input-rabbitmq
 └── logstash-output-rabbitmq
logstash-output-csv (3.0.8)
logstash-output-elasticsearch (11.6.0)
logstash-output-email (4.1.1)
logstash-output-file (4.3.0)
logstash-output-graphite (3.1.6)
logstash-output-http (5.5.0)
logstash-output-lumberjack (3.1.9)
logstash-output-nagios (3.0.6)
logstash-output-null (3.0.5)
logstash-output-pipe (3.0.6)
logstash-output-redis (5.0.0)
logstash-output-stdout (3.1.4)
logstash-output-tcp (6.1.0)
logstash-output-udp (3.2.0)
logstash-output-webhdfs (3.0.6)
logstash-patterns-core (4.3.4)

JVM (e.g. java -version):

openjdk version "11.0.16" 2022-07-19
OpenJDK Runtime Environment (build 11.0.16+8-post-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.16+8-post-Ubuntu-0ubuntu122.04, mixed mode, sharing)

If the affected version of Logstash is 7.9 (or earlier), or if it is NOT using the bundled JDK or using the 'no-jdk' version in 7.10 (or higher), please provide the following information:

  1. JVM version (java -version)
  2. JVM installation source (e.g. from the Operating System's package manager, from source, etc).
  3. Value of the LS_JAVA_HOME environment variable if set.

OS version (uname -a if on a Unix-like system):

Linux server1 5.15.0-50-generic #56-Ubuntu SMP Tue Sep 20 13:23:26 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux

Description of the problem including expected versus actual behavior:

Steps to reproduce:

Please include a minimal but complete recreation of the problem, including (e.g.) pipeline definition(s), settings, locale, etc. The easier you make for us to reproduce it, the more likely that somebody will take the time to look at it.

  1. kafka input plugin error: Unable to create Kafka consumer from given configuration
  2. It works with the same "logstash.conf" with versions =< 8.3.3

Provide logs (if relevant):

[2022-10-19T11:44:36,243][ERROR][logstash.inputs.kafka    ][main][c474ee0a252c935096ef4baa88631715b80e466ea26a4b02e464cefb9627dfdc] Unable to create Kafka consumer from given configuration {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka consumer>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user>}
[2022-10-19T11:44:36,245][ERROR][logstash.javapipeline    ][main][c474ee0a252c935096ef4baa88631715b80e466ea26a4b02e464cefb9627dfdc] A plugin had an unrecoverable error. Will restart this plugin.
  Pipeline_id:main
  Plugin: <LogStash::Inputs::Kafka topics=>["topic1", "topic2", "topic3"], ssl_truststore_location=>"/opt/logstash/kafka/truststore.jks", ssl_truststore_password=><password>, kerberos_config=>"/opt/logstash/kafka/krb5.conf", jaas_path=>"/opt/logstash/kafka/KafkaClient_jaas.conf", bootstrap_servers=>"server1:9093,server2:9093,server3:9093", client_id=>"logstash-i1", decorate_events=>"extended", codec=><LogStash::Codecs::Plain id=>"plain_101e4499-6300-46ff-8740-84f845a66fc4", enable_metric=>true, charset=>"UTF-8">, group_id=>"consumer1", security_protocol=>"SASL_SSL", id=>"c474ee0a252c935096ef4baa88631715b80e466ea26a4b02e464cefb9627dfdc", sasl_kerberos_service_name=>"kafka", enable_metric=>true, connections_max_idle_ms=>540000, metadata_max_age_ms=>300000, request_timeout_ms=>40000, schema_registry_validation=>"auto", auto_commit_interval_ms=>5000, check_crcs=>true, client_dns_lookup=>"default", consumer_threads=>1, enable_auto_commit=>true, fetch_max_bytes=>52428800, fetch_max_wait_ms=>500, heartbeat_interval_ms=>3000, isolation_level=>"read_uncommitted", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", max_poll_interval_ms=>300000, max_partition_fetch_bytes=>1048576, max_poll_records=>500, receive_buffer_bytes=>32768, reconnect_backoff_ms=>50, retry_backoff_ms=>100, send_buffer_bytes=>131072, session_timeout_ms=>10000, value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", sasl_mechanism=>"GSSAPI">
  Error: Failed to construct kafka consumer
  Exception: Java::OrgApacheKafkaCommon::KafkaException
  Stack: org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:823)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:665)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:646)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:626)
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(jdk/internal/reflect/NativeConstructorAccessorImpl.java:77)
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(jdk/internal/reflect/DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstanceWithCaller(java/lang/reflect/Constructor.java:499)
java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:480)
org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:269)
org.jruby.RubyClass.new(org/jruby/RubyClass.java:888)
org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)
opt.logstash.logstash_minus_8_dot_4_dot_0.vendor.bundle.jruby.$2_dot_6_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_12_dot_0_minus_java.lib.logstash.inputs.kafka.create_consumer(/opt/logstash/logstash-8.4.0/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-10.12.0-java/lib/logstash/inputs/kafka.rb:450)
opt.logstash.logstash_minus_8_dot_4_dot_0.vendor.bundle.jruby.$2_dot_6_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_12_dot_0_minus_java.lib.logstash.inputs.kafka.run(/opt/logstash/logstash-8.4.0/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-10.12.0-java/lib/logstash/inputs/kafka.rb:288)
org.jruby.RubyEnumerable$18.call(org/jruby/RubyEnumerable.java:815)
org.jruby.RubyEnumerator$1.call(org/jruby/RubyEnumerator.java:400)
org.jruby.RubyFixnum.times(org/jruby/RubyFixnum.java:308)
org.jruby.RubyInteger$INVOKER$i$0$0$times.call(org/jruby/RubyInteger$INVOKER$i$0$0$times.gen)
org.jruby.RubyClass.finvokeWithRefinements(org/jruby/RubyClass.java:512)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:500)
org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:387)
org.jruby.RubyEnumerator.__each__(org/jruby/RubyEnumerator.java:396)
org.jruby.RubyEnumerator.each(org/jruby/RubyEnumerator.java:392)
org.jruby.RubyEnumerator$INVOKER$i$each.call(org/jruby/RubyEnumerator$INVOKER$i$each.gen)
org.jruby.RubyEnumerable.collectCommon(org/jruby/RubyEnumerable.java:807)
org.jruby.RubyEnumerable.map(org/jruby/RubyEnumerable.java:799)
opt.logstash.logstash_minus_8_dot_4_dot_0.vendor.bundle.jruby.$2_dot_6_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_12_dot_0_minus_java.lib.logstash.inputs.kafka.run(/opt/logstash/logstash-8.4.0/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-10.12.0-java/lib/logstash/inputs/kafka.rb:288)
opt.logstash.logstash_minus_8_dot_4_dot_0.logstash_minus_core.lib.logstash.java_pipeline.inputworker(/opt/logstash/logstash-8.4.0/logstash-core/lib/logstash/java_pipeline.rb:410)
opt.logstash.logstash_minus_8_dot_4_dot_0.logstash_minus_core.lib.logstash.java_pipeline.start_input(/opt/logstash/logstash-8.4.0/logstash-core/lib/logstash/java_pipeline.rb:401)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:309)
java.lang.Thread.run(java/lang/Thread.java:833)
yaauie commented 2 years ago

javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user

The logs indicate a plugin configured as so (newlines added for readability):

<LogStash::Inputs::Kafka
  topics=>["topic1", "topic2", "topic3"],
  ssl_truststore_location=>"/opt/logstash/kafka/truststore.jks",
  ssl_truststore_password=><password>,
  kerberos_config=>"/opt/logstash/kafka/krb5.conf",
  jaas_path=>"/opt/logstash/kafka/KafkaClient_jaas.conf",
  bootstrap_servers=>"server1:9093,server2:9093,server3:9093",
  client_id=>"logstash-i1",
  decorate_events=>"extended",
  codec=> <LogStash::Codecs::Plain id=>"plain_101e4499-6300-46ff-8740-84f845a66fc4", enable_metric=>true, charset=>"UTF-8">,
  group_id=>"consumer1",
  security_protocol=>"SASL_SSL",
  id=>"c474ee0a252c935096ef4baa88631715b80e466ea26a4b02e464cefb9627dfdc",
  sasl_kerberos_service_name=>"kafka",
  enable_metric=>true,
  connections_max_idle_ms=>540000,
  metadata_max_age_ms=>300000,
  request_timeout_ms=>40000,
  schema_registry_validation=>"auto",
  auto_commit_interval_ms=>5000,
  check_crcs=>true,
  client_dns_lookup=>"default",
  consumer_threads=>1,
  enable_auto_commit=>true,
  fetch_max_bytes=>52428800,
  fetch_max_wait_ms=>500,
  heartbeat_interval_ms=>3000,
  isolation_level=>"read_uncommitted",
  key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer",
  max_poll_interval_ms=>300000,
  max_partition_fetch_bytes=>1048576,
  max_poll_records=>500,
  receive_buffer_bytes=>32768,
  reconnect_backoff_ms=>50,
  retry_backoff_ms=>100,
  send_buffer_bytes=>131072,
  session_timeout_ms=>10000,
  value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer",
  poll_timeout_ms=>100,
  ssl_endpoint_identification_algorithm=>"https",
  sasl_mechanism=>"GSSAPI"
>

As the logs state, upon establishing a connection, the kafka client was asked for a password but was unable to get one from you. It is possible that the jaas file you have provided does not contain the credentials, or is not discoverable by the user running the logstash process?

voriol commented 2 years ago

Hi @yaauie, thanks for your response.

The thing is: same config files, different logstash version, different behavior.

I expect: same config files, different logstash version, same behavior.

Thanks.

voriol commented 1 year ago

Ie, this works:

docker run --rm -it \
    -v /tmp/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
    -v /tmp/kafka:/usr/share/logstash/kafka/ \
    docker.elastic.co/logstash/logstash:8.3.3

This not:

docker run --rm -it \
    -v /tmp/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
    -v /tmp/kafka:/usr/share/logstash/kafka/ \
    docker.elastic.co/logstash/logstash:8.5.2

The content of /tmp/logstash.conf:

input {    
    kafka {
        codec => json        
        topics => ["topic_name"]  
        group_id => "consumer_name"
        bootstrap_servers => "server1:9093,server2:9093,server3:9093"
        security_protocol => "SASL_SSL"  
        sasl_kerberos_service_name => "kafka"
        jaas_path => "kafka/KafkaClient.conf"
        ssl_truststore_location => "kafka/truststore.jks"
        ssl_truststore_password => "xxxxxxxx"
    }
}

output {
    stdout { codec => rubydebug { metadata => true } }
}

The content of "/tmp/kafka":

/tmp/kafka/truststore.jks
/tmp/kafka/user.keytab
/tmp/kafka/KafkaClient.conf

The content of /tmp/kafka/KafkaClient.conf:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true    
  keyTab="kafka/user.keytab"
  principal="user@domain";
  };