logstash-plugins / logstash-integration-kafka

Kafka Integration for Logstash, providing Input and Output Plugins
Apache License 2.0
32 stars 60 forks source link

kafka input throws TypeError when record return empty headers #141

Closed kaisecheng closed 1 year ago

kaisecheng commented 1 year ago

Logstash information:

Please include the following information:

  1. Logstash version (e.g. bin/logstash --version) Logstash 8.6.2
  2. Logstash installation source (e.g. built from source, with a package manager: DEB/RPM, expanded from tar or zip archive, docker)
  3. How is Logstash being run (e.g. as a service/service manager: systemd, upstart, etc. Via command line, docker/kubernetes)
  4. How was the Logstash Plugin installed

JVM (e.g. java -version):

If the affected version of Logstash is 7.9 (or earlier), or if it is NOT using the bundled JDK or using the 'no-jdk' version in 7.10 (or higher), please provide the following information:

  1. JVM version (java -version) OpenJDK Runtime Environment Temurin-11.0.15+10 (build 11.0.15+10)
  2. JVM installation source (e.g. from the Operating System's package manager, from source, etc).
  3. Value of the JAVA_HOME environment variable if set.

OS version (uname -a if on a Unix-like system):

Description of the problem including expected versus actual behavior:

Steps to reproduce:

Please include a minimal but complete recreation of the problem, including (e.g.) pipeline definition(s), settings, locale, etc. The easier you make for us to reproduce it, the more likely that somebody will take the time to look at it.

  1. When Kafka broker return The committing offset data size is not valid. for sometimes, kafka-input consumer gets records with empty headers
  2. Logstash throws exception

Provide logs (if relevant):

[2023-02-13T08:34:38,430][ERROR][org.logstash.Logstash    ] uncaught exception (in thread kafka-input-worker-logstash-0)
org.jruby.exceptions.TypeError: (TypeError) wrong argument type NilClass (expected byte[])
    at org.jruby.javasupport.JavaArrayUtilities.bytes_to_ruby_string(org/jruby/javasupport/JavaArrayUtilities.java:77) ~[jruby-complete-9.2.20.1.jar:?]
    at org.jruby.javasupport.JavaArrayUtilities.bytes_to_ruby_string(org/jruby/javasupport/JavaArrayUtilities.java:58) ~[jruby-complete-9.2.20.1.jar:?]
    at org.jruby.java.addons.StringJavaAddons.from_java_bytes(org/jruby/java/addons/StringJavaAddons.java:16) ~[jruby-complete-9.2.20.1.jar:?]
    at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.maybe_set_metadata(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:374) ~[?:?]
    at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.maybe_set_metadata(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:373) ~[?:?]
    at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.handle_record(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:358) ~[?:?]
    at usr.share.logstash.logstash_minus_core.lib.logstash.codecs.delegator.decode(/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb:64) ~[?:?]
    at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_plain_minus_3_dot_1_dot_0.lib.logstash.codecs.plain.decode(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-plain-3.1.0/lib/logstash/codecs/plain.rb:54) ~[?:?]
    at usr.share.logstash.logstash_minus_core.lib.logstash.codecs.delegator.decode(/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb:62) ~[?:?]
    at org.logstash.instrument.metrics.AbstractSimpleMetricExt.time(org/logstash/instrument/metrics/AbstractSimpleMetricExt.java:65) ~[logstash-core.jar:?]
    at org.logstash.instrument.metrics.AbstractNamespacedMetricExt.time(org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java:64) ~[logstash-core.jar:?]
    at usr.share.logstash.logstash_minus_core.lib.logstash.codecs.delegator.decode(/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb:61) ~[?:?]
    at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.handle_record(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:356) ~[?:?]
    at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.thread_runner(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
    at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.thread_runner(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
kaisecheng commented 1 year ago

The issue is fixed in 11.2.1