logstash-plugins / logstash-integration-kafka

Kafka Integration for Logstash, providing Input and Output Plugins
Apache License 2.0
32 stars 60 forks source link

Issue's in configuring kafka input plugin with TLS #146

Open girishms-sentient opened 1 year ago

girishms-sentient commented 1 year ago

Logstash information:

Please include the following information:

  1. Logstash version - 8.8.1
  2. Logstash installation source - Bitnami Logstash helm chart
  3. How is Logstash being run - Kubernetes stateful-set
  4. Kafka Input plugin - https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

Description of the problem including expected versus actual behavior:

I'm having trouble integrating Kafka with Logstash, and Kafka is configured with TLS. I am getting the following exceptions when trying to provide PKCS12 format TLS certificates in the Kafka input plugin.

Unable to create Kafka consumer from given configuration {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka consumer>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to load SSL keystore /usr/share/logstash/config/certs/ample-user/p12 of type PKCS12>} [2023-07-05T16:16:33,533][ERROR][logstash.javapipeline ][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] A plugin had an unrecoverable error. Will restart this plugin.

More logs to follow.

Helm chart values:

    extraEnvVars:
      - name: KAFKA_HOST
        value: "kafka-kafka-bootstrap"
      - name: KAFKA_PORT
        value: "9093"

    extraVolumeMounts:
      - name: kafka-cluster-certs
        mountPath: /usr/share/logstash/config/certs/kafka-cluster
        readOnly: true
      - name: ample-user
        mountPath: /usr/share/logstash/config/certs/ample-user
        readOnly: true

    extraVolumes: 
      - name: kafka-cluster-certs
        secret:
          secretName: kafka-cluster-ca-cert
          items:
          - key: ca.p12
            path: p12
          - key: ca.password
            path: password
      - name: ample-user
        secret:
          secretName: ample-user
          items:
          - key: user.p12
            path: p12
          - key: user.password
            path: password

    input: |-
      kafka {
        bootstrap_servers => ["${KAFKA_HOST}:${KAFKA_PORT}"]
        topics => ["fluent-log-vault"]
        type => "vault"
        security_protocol => "SSL"
        ssl_truststore_type => "PKCS12"
        ssl_keystore_type => "PKCS12"
        ssl_truststore_location => "/usr/share/logstash/config/certs/kafka-cluster/p12"
        ssl_truststore_password => "/usr/share/logstash/config/certs/password"
        ssl_keystore_location => "/usr/share/logstash/config/certs/ample-user/p12"
        ssl_keystore_password => "/usr/share/logstash/config/certs/ample-user/password"
      }

Detailed Logs:

[2023-07-05T16:16:33,529][INFO ][org.apache.kafka.common.metrics.Metrics][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Metrics scheduler closed
[2023-07-05T16:16:33,530][INFO ][org.apache.kafka.common.metrics.Metrics][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Closing reporter org.apache.kafka.common.metrics.JmxReporter
[2023-07-05T16:16:33,530][INFO ][org.apache.kafka.common.metrics.Metrics][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Metrics reporters closed
[2023-07-05T16:16:33,530][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] App info kafka.consumer for logstash-0 unregistered
[2023-07-05T16:16:33,531][ERROR][logstash.inputs.kafka    ][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Unable to create Kafka consumer from given configuration {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka consumer>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to load SSL keystore /usr/share/logstash/config/certs/ample-user/p12 of type PKCS12>}
[2023-07-05T16:16:33,533][ERROR][logstash.javapipeline    ][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] A plugin had an unrecoverable error. Will restart this plugin.
  Pipeline_id:main
  Plugin: <LogStash::Inputs::Kafka ssl_keystore_location=>"/usr/share/logstash/config/certs/ample-user/p12", ssl_keystore_password=><password>, topics=>["fluent-log-vault"], ssl_truststore_location=>"/usr/share/logstash/config/certs/kafka-cluster/p12", ssl_truststore_password=><password>, security_protocol=>"SSL", id=>"0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef", type=>"vault", ssl_truststore_type=>"PKCS12", bootstrap_servers=>"kafka-kafka-bootstrap:9093", ssl_keystore_type=>"PKCS12", codec=><LogStash::Codecs::Plain id=>"plain_c97407f8-f507-4c16-a8a6-8cc822b8e0eb", enable_metric=>true, charset=>"UTF-8">, enable_metric=>true, connections_max_idle_ms=>540000, metadata_max_age_ms=>300000, request_timeout_ms=>40000, schema_registry_ssl_keystore_type=>"jks", schema_registry_ssl_truststore_type=>"jks", schema_registry_validation=>"auto", auto_commit_interval_ms=>5000, check_crcs=>true, client_dns_lookup=>"use_all_dns_ips", client_id=>"logstash", consumer_threads=>1, enable_auto_commit=>true, fetch_max_bytes=>52428800, fetch_max_wait_ms=>500, group_id=>"logstash", heartbeat_interval_ms=>3000, isolation_level=>"read_uncommitted", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", max_poll_interval_ms=>300000, max_partition_fetch_bytes=>1048576, max_poll_records=>500, receive_buffer_bytes=>32768, reconnect_backoff_ms=>50, retry_backoff_ms=>100, send_buffer_bytes=>131072, session_timeout_ms=>10000, value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", sasl_mechanism=>"GSSAPI", decorate_events=>"none">
Error: Failed to construct kafka consumer
  Exception: Java::OrgApacheKafkaCommon::KafkaException
  Stack: org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:830)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:666)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:647)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:627)
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(jdk/internal/reflect/NativeConstructorAccessorImpl.java:77)
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(jdk/internal/reflect/DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstanceWithCaller(java/lang/reflect/Constructor.java:499)
java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:480)
org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:269)
org.jruby.RubyClass.new(org/jruby/RubyClass.java:890)
org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)
RUBY.create_consumer(/opt/bitnami/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-11.2.1-java/lib/logstash/inputs/kafka.rb:477)
RUBY.run(/opt/bitnami/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-11.2.1-java/lib/logstash/inputs/kafka.rb:297)
org.jruby.RubyEnumerable$18.call(org/jruby/RubyEnumerable.java:815)
org.jruby.RubyEnumerator$1.call(org/jruby/RubyEnumerator.java:400)
org.jruby.RubyFixnum.times(org/jruby/RubyFixnum.java:308)
org.jruby.RubyInteger$INVOKER$i$0$0$times.call(org/jruby/RubyInteger$INVOKER$i$0$0$times.gen)
org.jruby.RubyClass.finvokeWithRefinements(org/jruby/RubyClass.java:514)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:502)
org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:387)
org.jruby.RubyEnumerator.__each__(org/jruby/RubyEnumerator.java:396)
org.jruby.RubyEnumerator.each(org/jruby/RubyEnumerator.java:392)
org.jruby.RubyEnumerator$INVOKER$i$each.call(org/jruby/RubyEnumerator$INVOKER$i$each.gen)
org.jruby.RubyEnumerable.collectCommon(org/jruby/RubyEnumerable.java:807)
org.jruby.RubyEnumerable.map(org/jruby/RubyEnumerable.java:799)
org.jruby.RubyEnumerable$INVOKER$s$0$0$map.call(org/jruby/RubyEnumerable$INVOKER$s$0$0$map.gen)
RUBY.run(/opt/bitnami/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-11.2.1-java/lib/logstash/inputs/kafka.rb:295)
RUBY.inputworker(/opt/bitnami/logstash/logstash-core/lib/logstash/java_pipeline.rb:414)
RUBY.start_input(/opt/bitnami/logstash/logstash-core/lib/logstash/java_pipeline.rb:405)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:309)
java.lang.Thread.run(java/lang/Thread.java:833)

Is there any way to solve this problem?