logstash-plugins / logstash-integration-kafka

Kafka Integration for Logstash, providing Input and Output Plugins
Apache License 2.0
32 stars 60 forks source link

Unable to use MSK IAM Auth jar file #165

Open tkaynejc opened 4 months ago

tkaynejc commented 4 months ago

Logstash information:

Please include the following information:

  1. Logstash version - 8.13.3
  2. Logstash installation source - docker
  3. How is Logstash being run - docker/kubernetes
  4. How was the Logstash Plugin installed - bin/logstash-core install --version 10.12.0 logstash-integration-kafka

JVM (e.g. java -version):

/usr/share/logstash/jdk/bin/java --version
openjdk 17.0.11 2024-04-16
OpenJDK Runtime Environment Temurin-17.0.11+9 (build 17.0.11+9)
OpenJDK 64-Bit Server VM Temurin-17.0.11+9 (build 17.0.11+9, mixed mode, sharing)

Description of the problem including expected versus actual behavior: I want to have my logstash pod authenticate/connect to MSK cluster with IAM auth. My pod has a service account and policy all setup and ready to go. It just needs to use AWS's plugin and java classes to provide the authorization token. I was able to set the required sasl.mechanism, security.protoccol, and sasl.jaas.config with the existing Logstash configuration options no problem. I get this error when running my logstash pipeline :exception=>#<Java::OrgApacheKafkaCommonConfig::ConfigException: Invalid value software.amazon.msk.auth.iam.IAMClientCallbackHandler for configuration sasl.client.callback.handler.class: Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found.> I added the jar file in the same directory where the kafka-clients-2.8.1 jar file is at /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-10.12.0-java/vendor/jar-dependencies/org/apache/kafka/kafka-clients/2.8.1/ I believe this is because the java classpath is isolated within the plugin and hasn't registered the new jar.

Steps to reproduce:

  1. Install plugin
    bin/logstash-plugin install --version 10.12.0 logstash-integration-kafka
  2. Download jar and place in jar dependency path
    curl -L -o /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-10.12.0/-java/vendor/jar-dependencies/org/apache/kafka/kafka-clients/2.8.1/aws-msk-iam-auth-10.12.0-all.jar \ 
    "https://github.com/aws/aws-msk-iam-auth/releases/download/v10.12.0/aws-msk-iam-auth-10.12.0-all.jar"
  3. Edit vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-10.12.0-java/lib/logstash/outputs/kafka.rb by adding this property
    if props.get("sasl.mechanism") == "AWS_MSK_IAM"
    props.put("sasl.client.callback.handler.class","software.amazon.msk.auth.iam.IAMClientCallbackHandler")
    end
    1. Create a logstash config with kafka output and run it

Full stack trace Pipeline error {:pipeline_id=>"login-insights-ods", :exception=>#<Java::OrgApacheKafkaCommonConfig::ConfigException: Invalid value software.amazon.msk.auth.iam.IAMClientCallbackHandler for configuration sasl.client.callback.handler.class: Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found.>, :backtrace=>["org.apache.kafka.common.config.ConfigDef.parseType(org/apache/kafka/common/config/ConfigDef.java:744)", "org.apache.kafka.common.config.ConfigDef.parseValue(org/apache/kafka/common/config/ConfigDef.java:490)", "org.apache.kafka.common.config.ConfigDef.parse(org/apache/kafka/common/config/ConfigDef.java:483)", "org.apache.kafka.common.config.AbstractConfig.<init>(org/apache/kafka/common/config/AbstractConfig.java:108)", "org.apache.kafka.common.config.AbstractConfig.<init>(org/apache/kafka/common/config/AbstractConfig.java:129)", "org.apache.kafka.clients.producer.ProducerConfig.<init>(org/apache/kafka/clients/producer/ProducerConfig.java:536)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:291)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:318)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:303)", "jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)", "jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(jdk/internal/reflect/NativeConstructorAccessorImpl.java:77)", "jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(jdk/internal/reflect/DelegatingConstructorAccessorImpl.java:45)", "java.lang.reflect.Constructor.newInstanceWithCaller(java/lang/reflect/Constructor.java:499)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:480)", "org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:165)", "org.jruby.RubyClass.new(org/jruby/RubyClass.java:904)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)", "usr.share.logstash.vendor.bundle.jruby.$3_dot_1_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_12_dot_0_minus_java.lib.logstash.outputs.kafka.create_producer(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-10.12.0-java/lib/logstash/outputs/kafka.rb:366)", "usr.share.logstash.vendor.bundle.jruby.$3_dot_1_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_12_dot_0_minus_java.lib.logstash.outputs.kafka.register(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-10.12.0-java/lib/logstash/outputs/kafka.rb:194)", "org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:580)", "org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:349)", "org.logstash.config.ir.compiler.OutputStrategyExt$SimpleAbstractOutputStrategyExt.reg(org/logstash/config/ir/compiler/OutputStrategyExt.java:275)", "org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.register(org/logstash/config/ir/compiler/OutputStrategyExt.java:131)", "org.logstash.config.ir.compiler.OutputDelegatorExt.doRegister(org/logstash/config/ir/compiler/OutputDelegatorExt.java:126)", "org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.register(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:69)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:237)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1989)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:236)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.maybe_setup_out_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:610)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:249)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.run(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:194)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:146)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:352)", "java.lang.Thread.run(java/lang/Thread.java:840)"], "pipeline.sources"=>["/usr/share/logstash/pipeline/conf.d/login-insights-ods.conf"], :thread=>"#<Thread:0x7144698f /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 run>"}