elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
90 stars 3.5k forks source link

PQ: Event containing 20+MB String cannot be deserialized for processing #16683

Open yaauie opened 1 week ago

yaauie commented 1 week ago

Logstash information:

Please include the following information:

  1. Logstash version (e.g. bin/logstash --version) 8.x, 8.15, main
  2. Logstash installation source (e.g. built from source, with a package manager: DEB/RPM, expanded from tar or zip archive, docker) N/A
  3. How is Logstash being run (e.g. as a service/service manager: systemd, upstart, etc. Via command line, docker/kubernetes) Any

Plugins installed: (bin/logstash-plugin list --verbose): N/A

JVM (e.g. java -version): Any

OS version (uname -a if on a Unix-like system): Any

Description of the problem including expected versus actual behavior:

Steps to reproduce:

  1. enable the PQ
    echo 'queue.type: persisted' > config/logstash.yml`
  2. generate a 20MB+ newline-terminated string
    (dd if=<(base64 < /dev/urandom) bs=1K count="$(expr 20 '*' 1024)"; echo) > big.txt
  3. run logstash in a way where the input plugin will generate an event containing the aforementioned 20MB+ string:
    bin/logstash -e 'input { stdin { codec => plain } } output { stdout { codec => dots } }' < big.txt 

Provide logs (if relevant):

(using improved pipeline cause-chain logging from elastic/logstash#16677):

Using system java: /Users/rye/.jenv/shims/java
Sending Logstash logs to /Users/rye/src/elastic/logstash@main/logs which is now configured via log4j2.properties
[2024-11-18T19:13:30,831][INFO ][logstash.runner          ] Log4j configuration path used is: /Users/rye/src/elastic/logstash@main/config/log4j2.properties
[2024-11-18T19:13:30,834][WARN ][logstash.runner          ] The use of JAVA_HOME has been deprecated. Logstash 8.0 and later ignores JAVA_HOME and uses the bundled JDK. Running Logstash with the bundled JDK is recommended. The bundled JDK has been verified to work with each specific version of Logstash, and generally provides best performance and reliability. If you have compelling reasons for using your own JDK (organizational-specific compliance requirements, for example), you can configure LS_JAVA_HOME to use that version instead.
[2024-11-18T19:13:30,834][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"9.0.0", "jruby.version"=>"jruby 9.4.9.0 (3.1.4) 2024-11-04 547c6b150e OpenJDK 64-Bit Server VM 17.0.12+0 on 17.0.12+0 +indy +jit [arm64-darwin]"}
[2024-11-18T19:13:30,835][INFO ][logstash.runner          ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, -Dlogstash.jackson.stream-read-constraints.max-string-length=400000000, -Dlogstash.jackson.stream-read-constraints.max-number-length=10000, -Djruby.regexp.interruptible=true, -Djdk.io.File.enableADS=true, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11]
[2024-11-18T19:13:30,835][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-string-length` configured to `400000000`
[2024-11-18T19:13:30,835][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-number-length` configured to `10000`
[2024-11-18T19:13:30,849][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2024-11-18T19:13:31,031][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2024-11-18T19:13:31,138][INFO ][org.reflections.Reflections] Reflections took 35 ms to scan 1 urls, producing 149 keys and 522 values
[2024-11-18T19:13:31,217][INFO ][logstash.javapipeline    ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
[2024-11-18T19:13:31,227][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1500, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x10df8cf8 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:138 run>"}
[2024-11-18T19:13:31,429][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.2}
[2024-11-18T19:13:31,444][INFO ][logstash.inputs.stdin    ][main] Automatically switching from plain to line codec {:plugin=>"stdin"}
[2024-11-18T19:13:31,449][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2024-11-18T19:13:31,458][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2024-11-18T19:13:31,516][ERROR][logstash.javapipeline    ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :exception=>"Java::OrgLogstashAckedqueue::QueueRuntimeException", :error=>"deserialize invocation error", :stacktrace=>"org.logstash.ackedqueue.Queue.deserialize(Queue.java:752)\norg.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)\norg.logstash.ackedqueue.Batch.<init>(Batch.java:49)\norg.logstash.ackedqueue.Queue.readPageBatch(Queue.java:681)\norg.logstash.ackedqueue.Queue.readBatch(Queue.java:614)\norg.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:158)\norg.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)\norg.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)\norg.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:569)\norg.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:300)\norg.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:164)\norg.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)\norg.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:456)\norg.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:195)\norg.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:346)\norg.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)\norg.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:118)\norg.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)\norg.jruby.runtime.Block.call(Block.java:144)\norg.jruby.RubyProc.call(RubyProc.java:354)\norg.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)\njava.base/java.lang.Thread.run(Thread.java:840)", :cause=>{:exception=>"Java::JavaLangReflect::InvocationTargetException", :error=>"", :stacktrace=>"java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:569)\norg.logstash.ackedqueue.Queue.deserialize(Queue.java:750)\norg.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)\norg.logstash.ackedqueue.Batch.<init>(Batch.java:49)\norg.logstash.ackedqueue.Queue.readPageBatch(Queue.java:681)\norg.logstash.ackedqueue.Queue.readBatch(Queue.java:614)\norg.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:158)\norg.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)\norg.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)\norg.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:569)\norg.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:300)\norg.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:164)\norg.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)\norg.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:456)\norg.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:195)\norg.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:346)\norg.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)\norg.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:118)\norg.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)\norg.jruby.runtime.Block.call(Block.java:144)\norg.jruby.RubyProc.call(RubyProc.java:354)\norg.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)\njava.base/java.lang.Thread.run(Thread.java:840)", :cause=>{:exception=>"Java::ComFasterxmlJacksonCoreExc::StreamConstraintsException", :error=>"String value length (20051112) exceeds the maximum allowed (20000000, from `StreamReadConstraints.getMaxStringLength()`)", :stacktrace=>"com.fasterxml.jackson.core.StreamReadConstraints._constructException(StreamReadConstraints.java:549)\ncom.fasterxml.jackson.core.StreamReadConstraints.validateStringLength(StreamReadConstraints.java:484)\ncom.fasterxml.jackson.core.util.ReadConstrainedTextBuffer.validateStringLength(ReadConstrainedTextBuffer.java:27)\ncom.fasterxml.jackson.core.util.TextBuffer.finishCurrentSegment(TextBuffer.java:939)\ncom.fasterxml.jackson.dataformat.cbor.CBORParser._finishChunkedText(CBORParser.java:2556)\ncom.fasterxml.jackson.dataformat.cbor.CBORParser._finishTextToken(CBORParser.java:2292)\ncom.fasterxml.jackson.dataformat.cbor.CBORParser.getValueAsString(CBORParser.java:1686)\norg.logstash.ObjectMappers$RubyStringDeserializer.deserialize(ObjectMappers.java:183)\norg.logstash.ObjectMappers$RubyStringDeserializer.deserialize(ObjectMappers.java:172)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:120)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromAny(AsArrayTypeDeserializer.java:71)\ncom.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializerNR.deserializeWithType(UntypedObjectDeserializerNR.java:115)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:625)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:449)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:120)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromAny(AsArrayTypeDeserializer.java:71)\ncom.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializerNR.deserializeWithType(UntypedObjectDeserializerNR.java:115)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:625)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:449)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:120)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromObject(AsArrayTypeDeserializer.java:61)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer.deserializeWithType(MapDeserializer.java:492)\ncom.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:74)\ncom.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:342)\ncom.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4899)\ncom.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3940)\norg.logstash.Event.fromSerializableMap(Event.java:269)\norg.logstash.Event.deserialize(Event.java:551)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:569)\norg.logstash.ackedqueue.Queue.deserialize(Queue.java:750)\norg.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)\norg.logstash.ackedqueue.Batch.<init>(Batch.java:49)\norg.logstash.ackedqueue.Queue.readPageBatch(Queue.java:681)\norg.logstash.ackedqueue.Queue.readBatch(Queue.java:614)\norg.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:158)\norg.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)\norg.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)\norg.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:569)\norg.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:300)\norg.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:164)\norg.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)\norg.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:456)\norg.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:195)\norg.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:346)\norg.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)\norg.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:118)\norg.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)\norg.jruby.runtime.Block.call(Block.java:144)\norg.jruby.RubyProc.call(RubyProc.java:354)\norg.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)\njava.base/java.lang.Thread.run(Thread.java:840)"}}, :thread=>"#<Thread:0x10df8cf8 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:138 sleep>"}
[2024-11-18T19:13:32,715][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2024-11-18T19:13:32,981][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2024-11-18T19:13:32,986][INFO ][logstash.runner          ] Logstash shut down.

Notably:

exception: "Java::OrgLogstashAckedqueue::QueueRuntimeException"
error:     "deserialize invocation error"
cause:
  exception: "Java::JavaLangReflect::InvocationTargetException"
  error: ""
  cause:
    exception: "Java::ComFasterxmlJacksonCoreExc::StreamConstraintsException"
    error: "String value length (20051112) exceeds the maximum allowed (20000000, from `StreamReadConstraints.getMaxStringLength()`)"

Notably, setting -Dlogstash.jackson.stream-read-constraints.max-string-length=400000000 does NOT change the error message, indicating that the stream constraint is not applied to the specific deserializer that works with the PQ, possibly because the CBOR_MAPPER is configred before the overrides are applied.

yaauie commented 5 days ago

I can confirm that the static initialization of the CBOR_MAPPER occurs berore the Jackson overrides have been applied, and have a local patch to move the Jackson overrides application code over to pure Java so that they can be deterministically applied first. This patch works, and when applied the above issue can no longer be reproduced.

The existing ruby implementation helpfully logs as it goes (and these logs are covered in specs), but since the ObjectMappers is first loaded (and therefore its components' static initializes fired) before the loggers have been configured, getting the helpful logging and validating that our overrides have taken effect will be tricky.