logstash-plugins / logstash-filter-geoip

Apache License 2.0
64 stars 82 forks source link

Logstash 6.5.4 exception in pipeline worker #148

Open Grunticus03 opened 5 years ago

Grunticus03 commented 5 years ago

This may be related to Issue #145, however my source field is never empty and I have consistently reproduced the issue on data with known source field values. This issue began after upgrading from 6.4.1, commenting out the geoip filter in my pipeline resolves the pipeline exception. The source field, prior to the issue occuring contained a single value in an array format. I have since modified the field to be formatted as a string, but the issue still occurs.

Error [2019-01-07T17:23:00,375][ERROR][logstash.pipeline ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {:pipeline_id=>"DMARC", "exception"=>"Index: 0, Size: 0", "backtrace"=>["java.util.ArrayList.rangeCheck(java/util/ArrayList.java:657)", "java.util.ArrayList.get(java/util/ArrayList.java:433)", "org.logstash.filters.GeoIPFilter.handleEvent(org/logstash/filters/GeoIPFilter.java:120)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:453)", "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:314)", "D_3a_.ElasticStack.Apps.Logstash.$6_dot_5_dot_4.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java.lib.logstash.filters.geoip.filter(D:/ElasticStack/Apps/Logstash/6.5.4/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/lib/logstash/filters/geoip.rb:111)", "D_3a_.ElasticStack.Apps.Logstash.$6_dot_5_dot_4.logstash_minus_core.lib.logstash.filters.base.do_filter(D:/ElasticStack/Apps/Logstash/6.5.4/logstash-core/lib/logstash/filters/base.rb:143)", "D_3a_.ElasticStack.Apps.Logstash.$6_dot_5_dot_4.logstash_minus_core.lib.logstash.filters.base.block in multi_filter(D:/ElasticStack/Apps/Logstash/6.5.4/logstash-core/lib/logstash/filters/base.rb:162)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1734)", "D_3a_.ElasticStack.Apps.Logstash.$6_dot_5_dot_4.logstash_minus_core.lib.logstash.filters.base.multi_filter(D:/ElasticStack/Apps/Logstash/6.5.4/logstash-core/lib/logstash/filters/base.rb:159)", "D_3a_.ElasticStack.Apps.Logstash.$6_dot_5_dot_4.logstash_minus_core.lib.logstash.filter_delegator.multi_filter(D:/ElasticStack/Apps/Logstash/6.5.4/logstash-core/lib/logstash/filter_delegator.rb:44)", "RUBY.block in filter_func((eval):125)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)", "D_3a_.ElasticStack.Apps.Logstash.$6_dot_5_dot_4.logstash_minus_core.lib.logstash.pipeline.filter_batch(D:/ElasticStack/Apps/Logstash/6.5.4/logstash-core/lib/logstash/pipeline.rb:341)", "D_3a_.ElasticStack.Apps.Logstash.$6_dot_5_dot_4.logstash_minus_core.lib.logstash.pipeline.RUBY$method$filter_batch$0$__VARARGS__(D_3a_/ElasticStack/Apps/Logstash/$6_dot_5_dot_4/logstash_minus_core/lib/logstash/D:/ElasticStack/Apps/Logstash/6.5.4/logstash-core/lib/logstash/pipeline.rb)", "D_3a_.ElasticStack.Apps.Logstash.$6_dot_5_dot_4.logstash_minus_core.lib.logstash.pipeline.worker_loop(D:/ElasticStack/Apps/Logstash/6.5.4/logstash-core/lib/logstash/pipeline.rb:320)", "D_3a_.ElasticStack.Apps.Logstash.$6_dot_5_dot_4.logstash_minus_core.lib.logstash.pipeline.RUBY$method$worker_loop$0$__VARARGS__(D_3a_/ElasticStack/Apps/Logstash/$6_dot_5_dot_4/logstash_minus_core/lib/logstash/D:/ElasticStack/Apps/Logstash/6.5.4/logstash-core/lib/logstash/pipeline.rb)", "D_3a_.ElasticStack.Apps.Logstash.$6_dot_5_dot_4.logstash_minus_core.lib.logstash.pipeline.block in start_workers(D:/ElasticStack/Apps/Logstash/6.5.4/logstash-core/lib/logstash/pipeline.rb:286)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)", "java.lang.Thread.run(java/lang/Thread.java:748)"], :thread=>"#<Thread:0x591fefa3 sleep>"}

Pipeline config

input {
  file {
    id => "Ingest\DMARC\*.xml"
    path => ["/ElasticStack/Ingest/DMARC/*.xml"]
    discover_interval => 5
    mode => "read"
    codec => multiline {
      negate => true
      pattern => "<record>"
      what => "previous"
      multiline_tag => ""
    }
  }
}
filter {
  mutate {
    gsub => [
      "message", " *<", "<"
    ]
  }
  xml {
    id => "Field Extraction"
    store_xml => false
    force_array => false
    source => "message"
    xpath => [
      "record/row/source_ip/text()", "email.source_ip",
      "record/auth_results/spf/result/text()", "authresult.spf_result"
    ]
  }
  mutate {
    id => "Field Normalize"
    strip => [
      "report.end",
      "report.start"
    ]
    lowercase => [
      "email.dkim_evaluation",
      "email.dmarc_action",
      "email.spf_evaluation",
      "policy.dmarc.domain_action",
      "policy.dmarc.subdomain_action",
      "policy.dkim_mode",
      "policy.spf_mode"
    ]
    gsub => [
      "policy.dkim_mode", "r", "Relaxed",
      "policy.dkim_mode", "s", "Strict",
      "policy.spf_mode", "r", "Relaxed",
      "policy.spf_mode", "s", "Strict",
      "policy.forensic_reporting", "0", "All Fail",
      "policy.forensic_reporting", "1", "Any Fail",
      "policy.forensic_reporting", "d", "DKIM Fail",
      "policy.forensic_reporting", "s", "SPF Fail"
    ]
  }
#  geoip {
#    source => "email.source_ip"
#  }
  # mutate {
    # add_field => {
      # "[geoip][location][coordinates]" => "%{[geoip][location][lat]}, %{[geoip][location][lon]}"
    # }
    # remove_field => ["host"]
  # }
#  if "_geoip_lookup_failure" in [tags] {
#    drop { }
#  }
  fingerprint {
    source => "message"
    target => "[@metadata][fingerprint]"
    method => "MURMUR3"
  }
}
output {
  elasticsearch {
    id => "Send to Elasticsearch"
    hosts => [""]
    document_id => "%{[@metadata][fingerprint]}"
    template => "d:/ElasticStack/Apps/Logstash/6.5.4/templates/dmarcxmltemplate.json"
    template_name => "dmarcxml"
    index => "dmarcxml-%{+YYYY.MM.dd}"
  }
}
inammathe commented 5 years ago

give this a whirl in your filter.

ruby {
    code => "event.remove('your_field') if event.get('[your_field]').empty?"
}

if [your_field]
{
    geoip { source => "your_field" }
}

I know you said your field is never empty but I reckon something else is messing with it. I wonder if it has something to do with the xml plugin.. I had the same issue and was grabbing the ip field out of xml just like you. That above work around got me going.

Either way the geoip filter plugin should probably be doing it's own isEmpty() check anyway.

inammathe commented 5 years ago

Might have tracked down the culprit - https://github.com/logstash-plugins/logstash-filter-xml/commit/1620c37c3f2adbaa482bccb9c3f6ccdb21b57dde#diff-46633c67f32820575bd30a6a35b252c7R160