logstash-plugins / logstash-filter-geoip

Apache License 2.0
64 stars 82 forks source link

Fix crash of pipeline due to NPE when lookup in DB with custom fields #225

Closed andsel closed 1 month ago

andsel commented 2 months ago

Release notes

Avoid to crash pipelines when lookup a database with customised fields.

What does this PR do?

Protect the handleEvent method from NPE generated in the MAxMind GeoIP reader library. That NPE happens when fetching data that contains customised attributes, this PR protects from NPE and return a lookup error so that the Event can be tagged with failure.

Why is it important/What is the impact to the user?

Permit the user to handle an unexpected error (through event tag) instead of crashing the pipeline, when the data lookup encounter a customised field in the DB.

Checklist

Author's Checklist

How to test this PR locally

Follow the steps listed in #226

Related issues

Use cases

Screenshots

Logs

Before the fix:

[2024-09-17T12:49:45,725][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2024-09-17T12:49:45,738][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2024-09-17T12:49:45,861][ERROR][logstash.javapipeline    ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"Cannot invoke \"Object.getClass()\" because \"parameters[index]\" is null", :exception=>Java::JavaLang::NullPointerException, :backtrace=>["com.maxmind.db.Decoder.decodeMapIntoObject(com/maxmind/db/Decoder.java:446)", "com.maxmind.db.Decoder.decodeMap(com/maxmind/db/Decoder.java:342)", "com.maxmind.db.Decoder.decodeByType(com/maxmind/db/Decoder.java:162)", "com.maxmind.db.Decoder.decode(com/maxmind/db/Decoder.java:151)", "com.maxmind.db.Decoder.decodeMapIntoObject(com/maxmind/db/Decoder.java:429)", "com.maxmind.db.Decoder.decodeMap(com/maxmind/db/Decoder.java:342)", "com.maxmind.db.Decoder.decodeByType(com/maxmind/db/Decoder.java:162)", "com.maxmind.db.Decoder.decode(com/maxmind/db/Decoder.java:151)", "com.maxmind.db.Decoder.decode(com/maxmind/db/Decoder.java:76)", "com.maxmind.db.Reader.resolveDataPointer(com/maxmind/db/Reader.java:275)", "com.maxmind.db.Reader.getRecord(com/maxmind/db/Reader.java:185)", "com.maxmind.geoip2.DatabaseReader.get(com/maxmind/geoip2/DatabaseReader.java:263)", "com.maxmind.geoip2.DatabaseReader.getCountry(com/maxmind/geoip2/DatabaseReader.java:309)", "com.maxmind.geoip2.DatabaseReader.country(com/maxmind/geoip2/DatabaseReader.java:292)", "org.logstash.filters.geoip.GeoIPFilter.retrieveCountryGeoData(org/logstash/filters/geoip/GeoIPFilter.java:340)", "org.logstash.filters.geoip.GeoIPFilter.handleEvent(org/logstash/filters/geoip/GeoIPFilter.java:168)", "jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:77)", "jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:569)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:315)", "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:176)", "RUBY.filter(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/3.1.0/gems/logstash-filter-geoip-7.3.0-java/lib/logstash/filters/geoip.rb:117)", "RUBY.do_filter(/Users/andrea/workspace/logstash_andsel/logstash-core/lib/logstash/filters/base.rb:158)", "RUBY.multi_filter(/Users/andrea/workspace/logstash_andsel/logstash-core/lib/logstash/filters/base.rb:176)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1981)", "org.jruby.RubyArray$INVOKER$i$0$0$each.call(org/jruby/RubyArray$INVOKER$i$0$0$each.gen)", "RUBY.multi_filter(/Users/andrea/workspace/logstash_andsel/logstash-core/lib/logstash/filters/base.rb:173)", "org.logstash.config.ir.compiler.FilterDelegatorExt.doMultiFilter(org/logstash/config/ir/compiler/FilterDelegatorExt.java:128)", "org.logstash.config.ir.compiler.AbstractFilterDelegatorExt.lambda$multiFilter$0(org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:133)", "org.logstash.instrument.metrics.timer.ConcurrentLiveTimerMetric.time(org/logstash/instrument/metrics/timer/ConcurrentLiveTimerMetric.java:47)", "org.logstash.config.ir.compiler.AbstractFilterDelegatorExt.multi_filter(org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:133)", "org.logstash.generated.CompiledDataset1.compute(org/logstash/generated/CompiledDataset1)", "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(org/logstash/config/ir/CompiledPipeline.java:364)", "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(org/logstash/config/ir/CompiledPipeline.java:358)", "org.logstash.execution.ObservedExecution.lambda$compute$0(org/logstash/execution/ObservedExecution.java:17)", "org.logstash.execution.WorkerObserver.lambda$observeExecutionComputation$0(org/logstash/execution/WorkerObserver.java:39)", "org.logstash.instrument.metrics.timer.ConcurrentLiveTimerMetric.time(org/logstash/instrument/metrics/timer/ConcurrentLiveTimerMetric.java:47)", "org.logstash.execution.WorkerObserver.lambda$executeWithTimers$1(org/logstash/execution/WorkerObserver.java:50)", "org.logstash.instrument.metrics.timer.ConcurrentLiveTimerMetric.time(org/logstash/instrument/metrics/timer/ConcurrentLiveTimerMetric.java:47)", "org.logstash.execution.WorkerObserver.executeWithTimers(org/logstash/execution/WorkerObserver.java:50)", "org.logstash.execution.WorkerObserver.observeExecutionComputation(org/logstash/execution/WorkerObserver.java:38)", "org.logstash.execution.ObservedExecution.compute(org/logstash/execution/ObservedExecution.java:17)", "org.logstash.execution.WorkerLoop.abortableCompute(org/logstash/execution/WorkerLoop.java:113)", "org.logstash.execution.WorkerLoop.run(org/logstash/execution/WorkerLoop.java:86)", "jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:77)", "jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:569)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:300)", "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:164)", "RUBY.start_workers(/Users/andrea/workspace/logstash_andsel/logstash-core/lib/logstash/java_pipeline.rb:304)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:354)", "java.lang.Thread.run(java/lang/Thread.java:840)"], :thread=>"#<Thread:0x42d2396b /Users/andrea/workspace/logstash_andsel/logstash-core/lib/logstash/java_pipeline.rb:134 sleep>"}
[2024-09-17T12:49:45,863][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2024-09-17T12:49:46,246][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}

After the fix:

[2024-09-17T12:06:30,284][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1500, "pipeline.sources"=>["/Users/andrea/workspace/maxmind_geoip/test_pipeline.conf"], :thread=>"#<Thread:0x16e89fb7 /Users/andrea/workspace/logstash_andsel/logstash-core/lib/logstash/java_pipeline.rb:134 run>"}
[2024-09-17T12:06:30,533][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.25}
[2024-09-17T12:06:30,535][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2024-09-17T12:06:30,539][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2024-09-17T12:06:30,670][WARN ][org.logstash.filters.geoip.GeoIPFilter][main][5e61d53b23fa160dcab040d363c74538967094556673c63a405737c32e381c73] GeoIP2 Exception in accessing custom field. exception=java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "parameters[index]" is null, field=ip, event=2024-09-17T10:06:30.554184Z {name=andreas-MBP-2.station} %{message}
{
            "ip" => "216.160.83.58",
      "@version" => "1",
    "@timestamp" => 2024-09-17T10:06:30.554184Z,
          "tags" => [
        [0] "_geoip_lookup_failure"
    ],
         "event" => {
        "original" => "{\"ip\": \"216.160.83.58\"}",
        "sequence" => 0
    },
          "host" => {
        "name" => "andreas-MBP-2.station"
    }
}
hackery commented 6 days ago

There's a related bug in this area - if the source field is an empty list, the get() throws an exception and also causes the pipeline to terminate.

[2024-11-28T13:31:26,832][ERROR][logstash.javapipeline    ][main] Pipeline worker error, the pipeline will be stopped {
    :pipeline_id=>"main",
    :error=>"Index 0 out of bounds for length 0",
    :exception=>Java::JavaLang::IndexOutOfBoundsException,
    :backtrace=>[
        "java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)",
        "java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)",
        "java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266)",
        "java.base/java.util.Objects.checkIndex(Objects.java:361)",
        "java.base/java.util.ArrayList.get(ArrayList.java:427)",
        "org.logstash.filters.geoip.GeoIPFilter.handleEvent(GeoIPFilter.java:153)",
        ....