revpoint / logstash-codec-avro_schema_registry

A logstash codec plugin for decoding and encoding Avro records
Other
26 stars 23 forks source link

exception when decorate_events is set to true for kafka input plugin #23

Open manohar3 opened 3 years ago

manohar3 commented 3 years ago

This is my input kafka plugin configuration

input {
    kafka {
        bootstrap_servers => "th-kafka-svc:9093" # example: dc-cdf-master.ad.interset.com:9092
        topics => ["avro-event"]
        ...
        decorate_events => true
        key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        codec => avro_schema_registry {
            endpoint => "https://th-schema-svc:8081"
            client_key => "/vault-crt/logstash.key"
            client_certificate => "/vault-crt/logstash.crt"
            ca_certificate => "/vault-crt/trustedCAs/RE_ca.crt"
            verify_mode => "verify_peer"
        }
    }
}

so do decorate_events is set to true, i see schema registry codec throws exception as below,

org.logstash.MissingConverterException: Missing Converter handling for full class name=[B, simple name=byte[] at org.logstash.Valuefier.fallbackConvert(Valuefier.java:118) at org.logstash.Valuefier.convert(Valuefier.java:96) at org.logstash.Valuefier.lambda$static$3(Valuefier.java:72) at org.logstash.Valuefier.convert(Valuefier.java:94) at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.safeValueifierConvert(JrubyEventExtLibrary.java:355) at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:121) at org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen) at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:835) at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207) at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_5_dot_3_minus_java.lib.logstash.inputs.kafka.RUBY$block$thread_runner$3(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:286) at org.jruby.ir.targets.YieldSite.yield(YieldSite.java:110) at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_2_dot_0.lib.logstash.codecs.avro_schema_registry.RUBY$method$decode$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:233) at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_2_dot_0.lib.logstash.codecs.avro_schema_registry.RUBY$method$decode$0$VARARGS(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb) at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80) at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70) er$2(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:279) at org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:148) at org.jruby.runtime.BlockBody.yield(BlockBody.java:106) at org.jruby.runtime.Block.yield(Block.java:184) at org.jruby.javasupport.ext.JavaLang$Iterable.each(JavaLang.java:98) at org.jruby.javasupport.ext.JavaLang$Iterable$INVOKER$s$0$0$each.call(JavaLang$Iterable$INVOKER$s$0$0$each.gen) at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodZeroBlock.call(JavaMethod.java:555) at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:197) at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_5_dot_3_minus_java.lib.logstash.inputs.kafka.RUBY$block$thread_runner$1(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:278) at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138) at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52) at org.jruby.runtime.Block.call(Block.java:139) at org.jruby.RubyProc.call(RubyProc.java:318) at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105) at java.lang.Thread.run(Thread.java:748)

Can you help to fix this issue.