elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
14.09k stars 3.48k forks source link

Persistent queues corruption with LogStash::Timestamp datatype #13253

Open rocco8620 opened 2 years ago

rocco8620 commented 2 years ago

Logstash information:

  1. Logstash version: 7.15.0
  2. Logstash installation source: DEB package from APT
  3. How is Logstash being run: systemd

Plugins installed: Default ones + logstash-filter-json_encode

JVM: openjdk version "16.0.1" 2021-04-20 OpenJDK Runtime Environment AdoptOpenJDK-16.0.1+9 (build 16.0.1+9) OpenJDK 64-Bit Server VM AdoptOpenJDK-16.0.1+9 (build 16.0.1+9, mixed mode, sharing)

OS version: Linux lisp-hz-siem-00 4.19.0-16-amd64 #1 SMP Debian 4.19.181-1 (2021-03-19) x86_64 GNU/Linux

Description of the problem including expected versus actual behavior:

I am experiencing a problem with the persistent queues, specifically queue corruption. After running logstash for some time (minutes) and restarting it, some pipeline (usually just one) fails to start with the following exception:

[2021-09-23T10:03:27,175][ERROR][logstash.javapipeline    ][output-elasticsearch] Pipeline worker error, the pipeline will be stopped 
{
    :pipeline_id=>"output-elasticsearch",
    :error=>"deserialize invocation error",
    :exception=>Java::OrgLogstashAckedqueue::QueueRuntimeException, 
    :backtrace=>[
        "org.logstash.ackedqueue.Queue.deserialize(Queue.java:701)",
        "org.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)",
        "org.logstash.ackedqueue.Batch.<init>(Batch.java:49)",
        "org.logstash.ackedqueue.Queue.readPageBatch(Queue.java:630)",
        "org.logstash.ackedqueue.Queue.readBatch(Queue.java:563)",
        "org.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:150)",
        "org.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)",
        "org.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)",
        "org.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)",
        "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
        "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
        "java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
        "java.base/java.lang.reflect.Method.invoke(Method.java:566)",
        "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:441)",
        "org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:305)",
        "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)",
        "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_workers$5(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:295)",
        "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)",
        "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)",
        "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)",
        "org.jruby.runtime.Block.call(Block.java:139)",
        "org.jruby.RubyProc.call(RubyProc.java:318)",
        "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)",
        "java.base/java.lang.Thread.run(Thread.java:829)"
    ], 
    :thread=>"#<Thread:0x413e7d42 sleep>"
}

To identify the problem i tried to debug logstash and put a breakpoint where the exception was raised, to see the full stackatrace:

Exception java.lang.reflect.InvocationTargetException
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:566)
           at org.logstash.ackedqueue.Queue.deserialize(Queue.java:699)
           at org.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)
           at org.logstash.ackedqueue.Batch.<init>(Batch.java:49)
           at org.logstash.ackedqueue.Queue.readPageBatch(Queue.java:630)
           at org.logstash.ackedqueue.Queue.readBatch(Queue.java:563)
           at org.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:150)
           at org.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)
           at org.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)
           at org.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:566)
           at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:441)
           at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:305)
           at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)
           at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_workers$5(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:295)
           at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)
           at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)
           at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)
           at org.jruby.runtime.Block.call(Block.java:139)
           at org.jruby.RubyProc.call(RubyProc.java:318)
           at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)
           at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Invalid format: "null" (through reference chain: java.util.HashMap["DATA"]->org.logstash.ConvertedMap["siem_properties"]->org.logstash.ConvertedMap["logstash_received_at"]
           at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:394)
           at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:353)
           at com.fasterxml.jackson.databind.deser.std.ContainerDeserializerBase.wrapAndThrow(ContainerDeserializerBase.java:181)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:539)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:364)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:116)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromAny(AsArrayTypeDeserializer.java:71)
           at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserializeWithType(UntypedObjectDeserializer.java:712)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:529)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:364)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:116)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromAny(AsArrayTypeDeserializer.java:71)
           at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserializeWithType(UntypedObjectDeserializer.java:712)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:529)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:364)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:116)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromObject(AsArrayTypeDeserializer.java:61)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserializeWithType(MapDeserializer.java:400)
           at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:68)
           at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4014)
           at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3122)
           at org.logstash.Event.fromSerializableMap(Event.java:234)
           at org.logstash.Event.deserialize(Event.java:510)
           ... 28 more
Caused by: java.lang.IllegalArgumentException: Invalid format: "null"
           at org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:945)
           at org.logstash.Timestamp.<init>(Timestamp.java:64)
           at org.logstash.ObjectMappers$TimestampDeserializer.deserialize(ObjectMappers.java:273)
           at org.logstash.ObjectMappers$TimestampDeserializer.deserialize(ObjectMappers.java:262)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:116)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromAny(AsArrayTypeDeserializer.java:71)
           at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserializeWithType(UntypedObjectDeserializer.java:712)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:529)
           ... 50 more

This last stacktrace tells me the problem is with the field [siem_properties][logstash_received_at]. This field is of type LogStash::Timestamp and in the logstash pipeline is set with the ruby plugin in this way:

ruby {
      code => "event.set('[siem_properties][logstash_received_at]' , Time.now.utc())"
}

I noticed that the same problem can happen with the field: @timestamp, that i do not directly set.


To try to identify where the queue corruption happens, i patched the function serialize() of the file org/logstash/Event.java to try to deserialize the just-serialized-event to check for its validity.

public byte[] serialize() {
    HashMap<String, ConvertedMap> map = new HashMap<String, ConvertedMap>(2, 1.0f);
    map.put(DATA_MAP_KEY, this.data);
    map.put(META_MAP_KEY, this.metadata);
    byte[] result = ObjectMappers.CBOR_MAPPER.writeValueAsBytes(map);
    try {
        Event event = Event.deserialize(result);
    }
    catch (Exception e) {
        logger.info("Error serializing event, exception=" + e);
    }
    return result;
}

After the patch, the following lines showed up in the log:

[2021-09-27T14:12:42,984][INFO ][org.logstash.Event       ][win-logins-correlation][5985a8e6151e894bad3977df5c1a852b021ef839cb037366465d8f7b654289ac] Error serializing event, exception=com.fasterxml.jackson.databind.JsonMappingException: Invalid format: "null" (through reference chain: java.util.HashMap["DATA"]->org.logstash.ConvertedMap["_@timestamp"])
[2021-09-27T14:12:43,027][INFO ][org.logstash.Event       ][win-logins-correlation][5985a8e6151e894bad3977df5c1a852b021ef839cb037366465d8f7b654289ac] Error serializing event, exception=com.fasterxml.jackson.databind.JsonMappingException: Invalid format: "null" (through reference chain: java.util.HashMap["DATA"]->org.logstash.ConvertedMap["_@timestamp"])
[2021-09-27T14:12:43,071][INFO ][org.logstash.Event       ][win-logins-correlation][5985a8e6151e894bad3977df5c1a852b021ef839cb037366465d8f7b654289ac] Error serializing event, exception=com.fasterxml.jackson.databind.JsonMappingException: Invalid format: "null" (through reference chain: java.util.HashMap["DATA"]->org.logstash.ConvertedMap["_@timestamp"])

I am unable to pintpoint the exact causes and to create a reproducible example because i am using multiple pipelines in a quite complex environment, but i can try to come up with an example if necessary. I can also try some experiments in my environment if needed.

I have also got an example of corrupted queue files to analize if it can be useful.


To give some context about my logstash pipelines:

Those are:

winlogbeat-5001 :arrow_right: win-logins-correlation :arrow_right: output-elasticsearch

rocco8620 commented 2 years ago

While debugging the problem, another (probably related) exception happened:

[2021-09-27T14:42:49,792][ERROR][logstash.javapipeline    ][win-logins-correlation] Pipeline worker error, the pipeline will be stopped {
    :pipeline_id=>"win-logins-correlation", 
    :error=>"", 
    :exception=>Java::JavaLang::NullPointerException, 
    :backtrace=>[
        "org.logstash.ext.JrubyTimestampExtLibrary$RubyTimestamp.to_iso8601(org/logstash/ext/JrubyTimestampExtLibrary.java:131)", 
        "org.logstash.ext.JrubyTimestampExtLibrary$RubyTimestamp.to_s(org/logstash/ext/JrubyTimestampExtLibrary.java:119)",
        "org.logstash.ext.JrubyTimestampExtLibrary$RubyTimestamp$INVOKER$i$0$0$ruby_to_s.call(org/logstash/ext/JrubyTimestampExtLibrary$RubyTimestamp$INVOKER$i$0$0$ruby_to_s.gen)",
        "org.jruby.RubyObject.toRubyString(org/jruby/RubyObject.java:245)",
        "org.jruby.RubyObject.toString(org/jruby/RubyObject.java:254)",
        "java.lang.String.valueOf(java/lang/String.java:2951)",
        "java.lang.StringBuilder.append(java/lang/StringBuilder.java:168)",
        "java.util.AbstractMap.toString(java/util/AbstractMap.java:556)",
        "java.lang.String.valueOf(java/lang/String.java:2951)",
        "java.lang.StringBuilder.append(java/lang/StringBuilder.java:168)",
        "java.util.AbstractMap.toString(java/util/AbstractMap.java:556)",
        "java.lang.String.valueOf(java/lang/String.java:2951)",
        "java.lang.StringBuilder.append(java/lang/StringBuilder.java:168)",
        "org.logstash.Event.serialize(org/logstash/Event.java:413)",
        "org.logstash.ackedqueue.Queue.write(org/logstash/ackedqueue/Queue.java:366)",
        "org.logstash.ackedqueue.ext.JRubyAckedQueueExt.rubyWrite(org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java:128)",
        "org.logstash.ext.JrubyAckedWriteClientExt.doPush(org/logstash/ext/JrubyAckedWriteClientExt.java:78)",
        "org.logstash.ext.JRubyWrappedWriteClientExt.push(org/logstash/ext/JRubyWrappedWriteClientExt.java:102)",
        "org.logstash.ext.JRubyWrappedWriteClientExt$INVOKER$i$1$0$push.call(org/logstash/ext/JRubyWrappedWriteClientExt$INVOKER$i$1$0$push.gen)",
        "usr.share.logstash.logstash_minus_core.lib.logstash.plugins.builtin.pipeline.input.internalReceive(usr/share/logstash/logstash_minus_core/lib/logstash/plugins/builtin/pipeline//usr/share/logstash/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb:64)",
        "org.jruby.RubyProc.call(org/jruby/RubyProc.java:318)",
        "org.jruby.javasupport.Java$ProcToInterface.callProc(org/jruby/javasupport/Java.java:1136)",
        "org.jruby.javasupport.Java$ProcToInterface.access$300(org/jruby/javasupport/Java.java:1113)",
        "org.jruby.javasupport.Java$ProcToInterface$ConcreteMethod.call(org/jruby/javasupport/Java.java:1174)",
        "java.util.stream.ForEachOps$ForEachOp$OfRef.accept(java/util/stream/ForEachOps.java:183)",
        "java.util.stream.ReferencePipeline$3$1.accept(java/util/stream/ReferencePipeline.java:195)",
        "java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(java/util/AbstractList.java:720)",
        "java.util.stream.AbstractPipeline.copyInto(java/util/stream/AbstractPipeline.java:484)",
        "java.util.stream.AbstractPipeline.wrapAndCopyInto(java/util/stream/AbstractPipeline.java:474)",
        "java.util.stream.ForEachOps$ForEachOp.evaluateSequential(java/util/stream/ForEachOps.java:150)",
        "java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(java/util/stream/ForEachOps.java:173)",
        "java.util.stream.AbstractPipeline.evaluate(java/util/stream/AbstractPipeline.java:234)",
        "java.util.stream.ReferencePipeline.forEach(java/util/stream/ReferencePipeline.java:497)",
        "jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
        "jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:62)",
        "jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)",
        "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)",
        "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:426)",
        "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:293)",
        "usr.share.logstash.logstash_minus_core.lib.logstash.plugins.builtin.pipeline.input.internalReceive(usr/share/logstash/logstash_minus_core/lib/logstash/plugins/builtin/pipeline//usr/share/logstash/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb:62)",
        "usr.share.logstash.logstash_minus_core.lib.logstash.plugins.builtin.pipeline.input.RUBY$method$internalReceive$0$__VARARGS__(usr/share/logstash/logstash_minus_core/lib/logstash/plugins/builtin/pipeline//usr/share/logstash/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb)",
        "org.logstash.plugins.pipeline.PipelineBus.lambda$sendEvents$1(org/logstash/plugins/pipeline/PipelineBus.java:64)",
        "java.util.concurrent.ConcurrentHashMap.forEach(java/util/concurrent/ConcurrentHashMap.java:1603)",
        "org.logstash.plugins.pipeline.PipelineBus.sendEvents(org/logstash/plugins/pipeline/PipelineBus.java:60)",
        "jdk.internal.reflect.GeneratedMethodAccessor78.invoke(jdk/internal/reflect/GeneratedMethodAccessor78)",
        "jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)",
        "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)",
        "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:486)",
        "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:341)",
        "usr.share.logstash.logstash_minus_core.lib.logstash.plugins.builtin.pipeline.output.multi_receive(usr/share/logstash/logstash_minus_core/lib/logstash/plugins/builtin/pipeline//usr/share/logstash/logstash-core/lib/logstash/plugins/builtin/pipeline/output.rb:39)",
        "org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.invokeOutput(org/logstash/config/ir/compiler/OutputStrategyExt.java:153)",
        "org.logstash.config.ir.compiler.OutputStrategyExt$SimpleAbstractOutputStrategyExt.doOutput(org/logstash/config/ir/compiler/OutputStrategyExt.java:279)",
        "org.logstash.config.ir.compiler.OutputStrategyExt$SharedOutputStrategyExt.output(org/logstash/config/ir/compiler/OutputStrategyExt.java:312)",
        "org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.multi_receive(org/logstash/config/ir/compiler/OutputStrategyExt.java:143)",
        "org.logstash.config.ir.compiler.OutputDelegatorExt.doOutput(org/logstash/config/ir/compiler/OutputDelegatorExt.java:102)",
        "org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:121)",
        "org.logstash.generated.CompiledDataset2.compute(org/logstash/generated/CompiledDataset2)",
        "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(org/logstash/config/ir/CompiledPipeline.java:333)",
        "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(org/logstash/config/ir/CompiledPipeline.java:323)",
        "org.logstash.execution.WorkerLoop.run(org/logstash/execution/WorkerLoop.java:87)",
        "jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
        "jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:62)",
        "jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)",
        "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)",
        "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:441)",
        "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:305)",
        "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(usr/share/logstash/logstash_minus_core/lib/logstash//usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:295)",
        "org.jruby.RubyProc.call(org/jruby/RubyProc.java:318)",
        "java.lang.Thread.run(java/lang/Thread.java:829)"
    ],
    :thread=>"#<Thread:0x76136a01 sleep>"
} 
rocco8620 commented 2 years ago

Hi, Have anyone had a chance to take a look at this issue? Is there any more information I can provide to debug the problem? R.

rocco8620 commented 2 years ago

Hello, Is there any news?

@kares Have you got any tip on how to debug this kind of problems?

kaisecheng commented 2 years ago

@rocco8620 to help investigate we need a minimal pipeline that reproduces this issue.

you made a good step on patching serialize(). It seems that the data in the map is the key part of the problem

map.put(DATA_MAP_KEY, this.data);
map.put(META_MAP_KEY, this.metadata);

A json format of this.data and this.metadata would help a lot.

rocco8620 commented 2 years ago

I have got a somewhat reproducible example. I copied the persistent queue files from a logstash instance that crashed on reload due to this bug, and loaded it on a docker logstash instance.

The problem happens on both logstash 7.16.3 and 8.1.1.

Those are the steps to reproduce:

  1. Create a logstash pipeline (mypipeline.conf) with the code:
input {
  generator {
    lines => [ "line 1" ]
  }
}

filter {

}

output {
   stdout {
       codec => "rubydebug"
   }
}
  1. Create the pipelines.yml file with content:
- pipeline.id: mypipeline
  path.config: "/usr/share/logstash/pipeline/mypipeline.conf"
  queue.type: persisted
  1. Copy the attached directory mypipeline_queue.zip in the logstash queues directory.

  2. Start logstash. It crashes with the exception when tring to load the PQ:

Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to /usr/share/logstash/logs which is now configured via log4j2.properties
[2022-03-29T10:17:14,992][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/share/logstash/config/log4j2.properties
[2022-03-29T10:17:15,002][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.16.3", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.13+8 on 11.0.13+8 +indy +jit [linux-x86_64]"}
[2022-03-29T10:17:16,753][WARN ][logstash.monitoringextension.pipelineregisterhook] xpack.monitoring.enabled has not been defined, but found elasticsearch configuration. Please explicitly set `xpack.monitoring.enabled: true` in logstash.yml
[2022-03-29T10:17:16,761][WARN ][deprecation.logstash.monitoringextension.pipelineregisterhook] Internal collectors option for Logstash monitoring is deprecated and may be removed in a future release.
Please configure Metricbeat to monitor Logstash. Documentation can be found at: 
https://www.elastic.co/guide/en/logstash/current/monitoring-with-metricbeat.html
[2022-03-29T10:17:17,503][WARN ][deprecation.logstash.codecs.plain] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2022-03-29T10:17:17,624][WARN ][deprecation.logstash.outputs.elasticsearch] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2022-03-29T10:17:18,190][INFO ][logstash.licensechecker.licensereader] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elasticsearch:9200/]}}
[2022-03-29T10:17:38,536][WARN ][logstash.licensechecker.licensereader] Attempted to resurrect connection to dead ES instance, but got an error {:url=>"http://elasticsearch:9200/", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch: Name or service not known"}
[2022-03-29T10:17:38,607][WARN ][logstash.licensechecker.licensereader] Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch {:url=>http://elasticsearch:9200/, :error_message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch", :error_class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError"}
[2022-03-29T10:17:38,615][ERROR][logstash.licensechecker.licensereader] Unable to retrieve license information from license server {:message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}
[2022-03-29T10:17:38,680][ERROR][logstash.monitoring.internalpipelinesource] Failed to fetch X-Pack information from Elasticsearch. This is likely due to failure to reach a live Elasticsearch cluster.
[2022-03-29T10:17:38,997][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2022-03-29T10:17:39,999][INFO ][org.reflections.Reflections] Reflections took 140 ms to scan 1 urls, producing 119 keys and 417 values 
[2022-03-29T10:17:40,900][WARN ][deprecation.logstash.codecs.plain] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2022-03-29T10:17:41,689][INFO ][logstash.javapipeline    ][mypipeline] Starting pipeline {:pipeline_id=>"mypipeline", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["/usr/share/logstash/pipeline/mypipeline.conf"], :thread=>"#<Thread:0x6a1c706 run>"}
[2022-03-29T10:17:42,496][INFO ][logstash.javapipeline    ][mypipeline] Pipeline Java execution initialization time {"seconds"=>0.8}
[2022-03-29T10:17:42,521][INFO ][logstash.javapipeline    ][mypipeline] Pipeline started {"pipeline.id"=>"mypipeline"}
[2022-03-29T10:17:42,609][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:mypipeline], :non_running_pipelines=>[]}
[2022-03-29T10:17:42,616][ERROR][logstash.javapipeline    ][mypipeline] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"mypipeline", :error=>"deserialize invocation error", :exception=>Java::OrgLogstashAckedqueue::QueueRuntimeException, :backtrace=>["org.logstash.ackedqueue.Queue.deserialize(Queue.java:701)", "org.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)", "org.logstash.ackedqueue.Batch.<init>(Batch.java:49)", "org.logstash.ackedqueue.Queue.readPageBatch(Queue.java:630)", "org.logstash.ackedqueue.Queue.readBatch(Queue.java:563)", "org.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:150)", "org.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)", "org.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)", "org.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)", "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)", "java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "java.base/java.lang.reflect.Method.invoke(Method.java:566)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:426)", "org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:293)", "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:24)", "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:86)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_workers$5(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:299)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)", "org.jruby.runtime.Block.call(Block.java:139)", "org.jruby.RubyProc.call(RubyProc.java:318)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)", "java.base/java.lang.Thread.run(Thread.java:829)"], :thread=>"#<Thread:0x6a1c706 sleep>"}
[2022-03-29T10:17:42,668][WARN ][logstash.javapipeline    ][mypipeline] Waiting for input plugin to close {:pipeline_id=>"mypipeline", :thread=>"#<Thread:0x6a1c706 run>"}
{
       "message" => "line 1",
          "host" => "fa29655ae517",
      "sequence" => 0,
      "@version" => "1",
    "@timestamp" => 2022-03-29T10:17:42.577Z
}
[2022-03-29T10:17:44,413][INFO ][logstash.javapipeline    ][mypipeline] Pipeline terminated {"pipeline.id"=>"mypipeline"}
[2022-03-29T10:17:44,718][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:mypipeline}
[2022-03-29T10:17:44,748][INFO ][logstash.runner          ] Logstash shut down.

To run logstash in docker i use the following helper script:

docker run --rm -it \
-v "/home/rocco/tmp/logstash pq corruption/2/conf.d/:/usr/share/logstash/pipeline/" \
-v "/home/rocco/tmp/logstash pq corruption/2/pipelines.yml:/usr/share/logstash/config/pipelines.yml" \
-v "/home/rocco/tmp/logstash pq corruption/2/data/:/usr/share/logstash/data/" \
-u root:root \
logstash:7.16.3

I am unable to create a pipeline that triggers the bug without having to manually copy the corrupted PQ files yet.

rocco8620 commented 2 years ago

Related to the same problem i have got another exception, this time not strictly related to the PQ, but related to the serialization.

[2022-03-29T10:24:23,430][ERROR][logstash.filters.ruby    ][win-logins-correlation][d66a85735130ac7ecb5e6acaca69007f6f79d8de14bb22b576602b347f053361] Ruby exception occurred:  {
:class=>"Java::JavaLang::NullPointerException",
:backtrace=>[
  "org.logstash.ext.JrubyTimestampExtLibrary$RubyTimestamp.ruby_time(JrubyTimestampExtLibrary.java:101)",
  "org.logstash.ext.JrubyTimestampExtLibrary$RubyTimestamp$INVOKER$i$0$0$ruby_time.call(JrubyTimestampExtLibrary$RubyTimestamp$INVOKER$i$0$0$ruby_time.gen)",
  "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:355)",
  "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:144)",
  "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:345)",
  "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)",
  "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:116)",
  "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:137)",
  "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:60)",
  "org.jruby.runtime.Block.call(Block.java:143)",
  "org.jruby.RubyProc.call(RubyProc.java:339)",
  "org.jruby.internal.runtime.methods.ProcMethod.call(ProcMethod.java:64)",
  "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)",
  "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_filter_minus_ruby_minus_3_dot_1_dot_7.lib.logstash.filters.ruby.RUBY$method$inline_script$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.7/lib/logstash/filters/ruby.rb:93)",
  "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_filter_minus_ruby_minus_3_dot_1_dot_7.lib.logstash.filters.ruby.RUBY$method$inline_script$0$__VARARGS__(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.7/lib/logstash/filters/ruby.rb)",
  "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)",
  "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)",
  "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)",
  "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_filter_minus_ruby_minus_3_dot_1_dot_7.lib.logstash.filters.ruby.RUBY$method$filter$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.7/lib/logstash/filters/ruby.rb:86)",
  "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_filter_minus_ruby_minus_3_dot_1_dot_7.lib.logstash.filters.ruby.RUBY$method$filter$0$__VARARGS__(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.7/lib/logstash/filters/ruby.rb)",
  "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)",
  "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)",
  "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)",
  "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$do_filter$0(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159)",
  "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$do_filter$0$__VARARGS__(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb)",
  "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)",
  "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)",
  "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:197)",
  "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$block$multi_filter$1(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:178)",
  "org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:148)",
  "org.jruby.runtime.BlockBody.yield(BlockBody.java:106)",
  "org.jruby.runtime.Block.yield(Block.java:184)",
  "org.jruby.RubyArray.each(RubyArray.java:1820)",
  "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$multi_filter$0(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:175)",
  "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:106)",
  "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:140)",
  "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:200)",
  "org.logstash.config.ir.compiler.FilterDelegatorExt.doMultiFilter(FilterDelegatorExt.java:127)",
  "org.logstash.config.ir.compiler.AbstractFilterDelegatorExt.multiFilter(AbstractFilterDelegatorExt.java:134)",
  "org.logstash.generated.CompiledDataset14.compute(Unknown Source)",
  "org.logstash.generated.CompiledDataset26.compute(Unknown Source)",
  "org.logstash.generated.CompiledDataset6.compute(Unknown Source)",
  "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(CompiledPipeline.java:329)",
  "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(CompiledPipeline.java:323)",
  "org.logstash.execution.WorkerLoop.run(WorkerLoop.java:87)",
  "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
  "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
  "java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
  "java.base/java.lang.reflect.Method.invoke(Method.java:566)",
  "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:441)",
  "org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:305)",
  "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)",
  "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_workers$5(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:295)",
  "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)",
  "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)",
  "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)",
  "org.jruby.runtime.Block.call(Block.java:139)",
  "org.jruby.RubyProc.call(RubyProc.java:318)",
  "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)",
  "java.base/java.lang.Thread.run(Thread.java:829)"
]}
rocco8620 commented 2 years ago

I tried to execute logstash with the setting log.level: trace and found three more lines which may be related:

[2022-03-29T10:50:43,996][DEBUG][org.logstash.ackedqueue.QueueUpgrade] PQ version file with correct version information (v2) found.
[2022-03-29T10:50:44,003][DEBUG][org.logstash.ackedqueue.Queue] opening head page: 177, in: /usr/share/logstash/data/queue/mypipeline, with checkpoint: pageNum=177, firstUnackedPageNum=177, firstUnackedSeqNum=1672753, minSeqNum=1672753, elementCount=1, isFullyAcked=no
[2022-03-29T10:50:44,241][DEBUG][org.logstash.ackedqueue.io.MmapPageIOV2] PageIO recovery element index:1, readNextElement exception: Element seqNum 0 is expected to be 1672754

The full log can be found here logstash-logs.log