logstash-plugins / logstash-input-rabbitmq

Apache License 2.0
30 stars 43 forks source link

Support metadata Date class conversion #112

Closed kostasb closed 5 years ago

kostasb commented 6 years ago

Description:

When the rabbitmq input encounters a Date field in the metadata the pipeline logs an unrecoverable error and restarts.

It seems that the headers normalization method in the rabbitmq client does not convert the Date class.

Stack trace:

[2018-04-19T10:10:04,143][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:main
Plugin: <LogStash::Inputs::RabbitMQ host=>["im-ea04.wintech.local"], user=>"ES", password=><password>, vhost=>"/", queue=>"Message.Audit.Error.LogToElasticsearch", exclusive=>false, durable=>true, auto_delete=>false, metadata_enabled=>true, codec=><LogStash::Codecs::Plain charset=>"Windows-1252", id=>"a86247b0-5f62-4d1a-a6f1-622644fd3059", enable_metric=>true>, type=>"alphaaudit", tags=>["Error"], heartbeat=>60, id=>"05eaad8179451a08770c837becdb6169f412b44e1232310f1c6a60732ee36977", enable_metric=>true, threads=>1, port=>5672, ssl_version=>"TLSv1.2", automatic_recovery=>true, connect_retry_interval=>1, passive=>false, prefetch_count=>256, ack=>true, key=>"logstash", subscription_retry_interval_seconds=>5>
Error: Missing Converter handling for full class name=java.util.Date, simple name=Date
Exception: Java::OrgLogstash::MissingConverterException
Stack: org.logstash.Valuefier.fallbackConvert(Valuefier.java:97)
org.logstash.Valuefier.convert(Valuefier.java:75)
org.logstash.Valuefier.lambda$static$3(Valuefier.java:51)
org.logstash.Valuefier.convert(Valuefier.java:73)
org.logstash.ConvertedMap$1.visit(ConvertedMap.java:34)
org.logstash.ConvertedMap$1.visit(ConvertedMap.java:28)
org.jruby.RubyHash.visitLimited(RubyHash.java:662)
org.jruby.RubyHash.visitAll(RubyHash.java:647)
org.logstash.ConvertedMap.newFromRubyHash(ConvertedMap.java:68)
org.logstash.ConvertedMap.newFromRubyHash(ConvertedMap.java:63)
org.logstash.Valuefier.lambda$initConverters$11(Valuefier.java:142)
org.logstash.Valuefier.convert(Valuefier.java:73)
org.logstash.ConvertedList.newFromRubyArray(ConvertedList.java:44)
org.logstash.Valuefier.lambda$initConverters$15(Valuefier.java:154)
org.logstash.Valuefier.convert(Valuefier.java:73)
org.logstash.ConvertedMap$1.visit(ConvertedMap.java:34)
org.logstash.ConvertedMap$1.visit(ConvertedMap.java:28)
org.jruby.RubyHash.visitLimited(RubyHash.java:662)
org.jruby.RubyHash.visitAll(RubyHash.java:647)
org.logstash.ConvertedMap.newFromRubyHash(ConvertedMap.java:68)
org.logstash.ConvertedMap.newFromRubyHash(ConvertedMap.java:63)
org.logstash.Valuefier.lambda$initConverters$11(Valuefier.java:142)
org.logstash.Valuefier.convert(Valuefier.java:73)
org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:95)
org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)
org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:193)
usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_rabbitmq_minus_6_dot_0_dot_2.lib.logstash.inputs.rabbitmq.invokeOther4:set(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-rabbitmq-6.0.2/lib/logstash/inputs/rabbitmq.rb:256)
usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_rabbitmq_minus_6_dot_0_dot_2.lib.logstash.inputs.rabbitmq.RUBY$block$internal_queue_consume!$1(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-rabbitmq-6.0.2/lib/logstash/inputs/rabbitmq.rb:256)
org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:156)
org.jruby.runtime.BlockBody.yield(BlockBody.java:114)
org.jruby.runtime.Block.yield(Block.java:165)
org.jruby.ir.runtime.IRRuntimeHelpers.yield(IRRuntimeHelpers.java:415)
org.jruby.ir.targets.YieldSite.yield(YieldSite.java:87)
usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_codec_minus_plain_minus_3_dot_0_dot_6.lib.logstash.codecs.plain.RUBY$method$decode$0(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-codec-plain-3.0.6/lib/logstash/codecs/plain.rb:35)
org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:103)
org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:163)
org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:171)
org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:177)
usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_rabbitmq_minus_6_dot_0_dot_2.lib.logstash.inputs.rabbitmq.invokeOther22:decode(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-rabbitmq-6.0.2/lib/logstash/inputs/rabbitmq.rb:253)
usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_rabbitmq_minus_6_dot_0_dot_2.lib.logstash.inputs.rabbitmq.RUBY$method$internal_queue_consume!$0(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-rabbitmq-6.0.2/lib/logstash/inputs/rabbitmq.rb:253)
org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:90)
org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:128)
org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:192)
org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:129)
usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_rabbitmq_minus_6_dot_0_dot_2.lib.logstash.inputs.rabbitmq.invokeOther19:internal_queue_consume!(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-rabbitmq-6.0.2/lib/logstash/inputs/rabbitmq.rb:233)
usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_rabbitmq_minus_6_dot_0_dot_2.lib.logstash.inputs.rabbitmq.RUBY$method$consume!$0(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-rabbitmq-6.0.2/lib/logstash/inputs/rabbitmq.rb:233)
org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:90)
org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:128)
org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:192)
org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:129)
usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_rabbitmq_minus_6_dot_0_dot_2.lib.logstash.inputs.rabbitmq.invokeOther2:consume!(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-rabbitmq-6.0.2/lib/logstash/inputs/rabbitmq.rb:179)
usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_rabbitmq_minus_6_dot_0_dot_2.lib.logstash.inputs.rabbitmq.RUBY$method$run$0(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-rabbitmq-6.0.2/lib/logstash/inputs/rabbitmq.rb:179)
org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:103)
org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:163)
org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:200)
org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:161)
org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314)
org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:73)
org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:83)
org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:179)
org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:165)
org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:200)
org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:161)
org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314)
org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:73)
org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:132)
org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:148)
org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:73)
org.jruby.runtime.Block.call(Block.java:124)
org.jruby.RubyProc.call(RubyProc.java:289)
org.jruby.RubyProc.call(RubyProc.java:246)
org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:104)
java.lang.Thread.run(Thread.java:748)
pedrotcm commented 6 years ago

The same error encounters in logstash v.6.2.4 and logstash-jdbc-input using mongodb connection:

[2018-04-26T21:11:40,393][WARN][logstash.inputs.jdbc] Exception when executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::OrgLogstas h::MissingConverterException: Missing Converter handling for full class name=java.util.Date, simple name=Date>}

dprada1979 commented 6 years ago

Description: Exactly the same error as the first comment encounters on the versions described.

We are taking information from 5 queues on RabbitMQ, we use basically the same configuration for all of them and we have similar messages on them (same JSON payload) but this happens only on one of them creating a bunch consumers (one per message +1) on this RabbitMQ queue. The queue where the issue appears is the only one that we use for deadletters on RabbitMQ (https://www.rabbitmq.com/dlx.html), so the issue should be related to this. It was working with previous versions of ELK v5.x.x.

We move the messages from the queue that we cannot read to another (new and different queue, consumer, vhost...) and we end up on the same exact situation. Looks like messages with the headers: x-death are causing the issue with the pluging:

image

Stack trace: [2018-08-20T16:41:36,337][ERROR][logstash.pipeline] A plugin had an unrecoverable error. Will restart this plugin. Pipeline_id:main Plugin: <LogStash::Inputs::RabbitMQ host=>[""], port=>, ssl=>true, vhost=>"", queue=>"", passive=>true, user=>"", password=><password>, metadata_enabled=>true, tags=>["], add_field=>{"indexType"=>""}, id=>"", enable_metric=>true, codec=><LogStash::Codecs::JSON id=>"json_8eb64acd-7840-4a31-ad48-95b45e784451", enable_metric=>true, charset=>"UTF-8">, threads=>1, ssl_version=>"TLSv1.2", automatic_recovery=>true, connect_retry_interval=>1, durable=>false, auto_delete=>false, exclusive=>false, prefetch_count=>256, ack=>true, key=>"logstash", subscription_retry_interval_seconds=>5> Error: Missing Converter handling for full class name=java.util.Date, simple name=Date ... org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:71) org.jruby.runtime.Block.call(Block.java:124) org.jruby.RubyProc.call(RubyProc.java:289) org.jruby.RubyProc.call(RubyProc.java:246) org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:104) java.lang.Thread.run(Thread.java:748)

Logstash configurations: input { rabbitmq { host => "098765432" port => 5671 ssl => true vhost => "098765432" queue => "098765432" passive => true user => "98765432" password => "0987654321" metadata_enabled => true tags => [ "rabbitmq", "deadletter" ] add_field => { "indexType" => "my-index" } } }

filter { if "rabbitmq" in [tags] { mutate { add_field => { "[@metadata][indexType]" => "%{indexType}" } remove_field => [ "indexType" ] } if [@metadata][rabbitmq_properties][timestamp] { date { match => ["[@metadata][rabbitmq_properties][timestamp]", "UNIX"] } } if "deadletter" in [tags] { if [@metadata][rabbitmq_properties][x-death] { mutate { add_field => { "rabbitmq_x_death" => "%{[@metadata][rabbitmq_properties][x-death]}" } } } } } }

output { elasticsearch { hosts => ["098765432"] index => "%{[@metadata][indexType]}-%{+YYYY.MM.dd}" } }

dprada1979 commented 6 years ago

As a temporal workaround, we have disabled metadata and we are adding the field handle as false:


  rabbitmq {
    host => "098765432"
    port => 5671
    ssl => true
    vhost => "098765432"
    queue => "098765432"
    passive => true
    user => "98765432"
    password => "0987654321"
    metadata_enabled => false
    tags => [ "rabbitmq", "deadletter" ]
     add_field => {
       "indexType" => "my-index"
       "handled" => "false"
     }
  }
}
msvticket commented 5 years ago

My fix in logstash for this issue is released with logstash version 7. Version 7.0.0-alpha2 is now released so it is easily accessible. So I think this issue can be closed.

kostasb commented 5 years ago

Resolved in Logstash v7.0 - thank you @msvticket