elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
78 stars 3.5k forks source link

Backpressure not propagated when output pipelines used #15917

Open garethhumphriesgkc opened 9 months ago

garethhumphriesgkc commented 9 months ago

Logstash information: docker: docker.elastic.co/logstash/logstash:8.12.1

JVM (e.g. java -version): bundled

OS version (uname -a if on a Unix-like system): Ubuntu 24.04

Description of the problem including expected versus actual behaviour:

I have found what seems to be an issue with back-pressure propagation in lumberjack/beats plugins.

I have written and attached a simple poc with docker composed of a data generator and two data receivers. The generator creates an event every 5 seconds and sends it to one of the receivers via the lumberjack output (with both receivers configured in hosts:[]). Each receiver will write any event it receives to disk and print it on stdout.

When the receivers have a single output stanza with multiple outputs configured, as soon as one output is unable to process a message successfully, the entire pipeline blocks, which applies back-pressure to the input. This back-pressure propagates across the network to the generator (via a simple disconnect, I expect), which chooses a new destination from hosts: []. No data is lost and fail over is instantaneous. This is a GoodThing™.

However, when the receivers are configured with the same outputs as pipelines rather than inline in the output stanza, this doesn't happen. The pipeline stalls but doesn't disconnect upstream, so failover never happens. This is a BadThing™.

The relevant difference can be boiled down to this:

.
.
.
output {
  stdout { }
  file {
    id             => "raw"
    path           => "/tmp/raw/%{+YYYY.MM.dd-HH}.json.gz"
    flush_interval => 10
    codec          => json_lines
    gzip           => true
  }
}

vs this:

.
.
.
output {
  pipeline { id => p_stdout send_to => "p_stdout" }
  pipeline { id => p_raw    send_to => "p_raw"    }
}

Steps to reproduce:

I have attached a docker-compose stack that demonstrates the issue:

$ tar -zxf output-pipelines-poc.tar.gz
$ cd output-pipelines-poc
$ docker compose up

Edit docker-compose.yml to switch between the two scenarios.

When it starts, you will see the generator start sending data to one receiver. You can then break the filesystem output on that receiver by running docker exec output-pipelines-poc-recv1-1 chmod 444 /tmp/raw/. What happens next depends on how the output is configured:

Both outputs directly in the output stanza:

output-pipelines-poc-recv1-1      | {
output-pipelines-poc-recv1-1      |     "@timestamp" => 2024-02-08T04:10:07.007946636Z,
output-pipelines-poc-recv1-1      |       "sequence" => 3
output-pipelines-poc-recv1-1      | }
output-pipelines-poc-recv1-1      | {
output-pipelines-poc-recv1-1      |     "@timestamp" => 2024-02-08T04:10:12.012313741Z,
output-pipelines-poc-recv1-1      |       "sequence" => 4
output-pipelines-poc-recv1-1      | }
output-pipelines-poc-recv1-1      | {
output-pipelines-poc-recv1-1      |     "@timestamp" => 2024-02-08T04:10:17.013979664Z,
output-pipelines-poc-recv1-1      |       "sequence" => 5
output-pipelines-poc-recv1-1      | }
output-pipelines-poc-recv1-1      | {
output-pipelines-poc-recv1-1      |     "@timestamp" => 2024-02-08T04:10:22.014107894Z,
output-pipelines-poc-recv1-1      |       "sequence" => 6
output-pipelines-poc-recv1-1      | }
output-pipelines-poc-recv1-1      | {"level":"INFO","loggerName":"logstash.outputs.file","timeMillis":1707365422036,"thread":"[main]>worker0","logEvent":{"message":"Opening file","path":"/tmp/raw/2024.02/2024.02.08-04.json.gz"}}
output-pipelines-poc-recv1-1      | {"level":"ERROR","loggerName":"logstash.javapipeline","timeMillis":1707365422041,"thread":"[main]>worker0","logEvent":{"message":"Pipeline worker error, the pipeline will be stopped","pipeline_id":"main","error":"(EACCES) Permission denied - /tmp/raw/2024.02/2024.02.08-04.json.gz","exception":"Java::OrgJrubyExceptions::SystemCallError","backtrace":["org.jruby.RubyIO.sysopen(org/jruby/RubyIO.java:1260)","org.jruby.RubyFile.initialize(org/jruby/RubyFile.java:364)","org.jruby.RubyClass.new(org/jruby/RubyClass.java:931)","org.jruby.RubyIO.new(org/jruby/RubyIO.java:869)","RUBY.open(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-file-4.3.0/lib/logstash/outputs/file.rb:276)","RUBY.multi_receive_encoded(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-file-4.3.0/lib/logstash/outputs/file.rb:119)","org.jruby.RubyHash.each(org/jruby/RubyHash.java:1601)","RUBY.multi_receive_encoded(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-file-4.3.0/lib/logstash/outputs/file.rb:118)","org.jruby.ext.thread.Mutex.synchronize(org/jruby/ext/thread/Mutex.java:171)","RUBY.multi_receive_encoded(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-file-4.3.0/lib/logstash/outputs/file.rb:117)","RUBY.multi_receive(/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:102)","org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:121)","RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:304)"],"thread":"#<Thread:0x324685a /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 sleep>"}}
output-pipelines-poc-generator-1  | {"level":"ERROR","loggerName":"logstash.outputs.lumberjack","timeMillis":1707365427023,"thread":"[main]>worker0","logEvent":{"message":"Client write error, trying connect","e":{"stackTrace":[{"class":"org.logstash.log.LoggerExt","method":"error","file":"org/logstash/log/LoggerExt.java","line":127},{"class":"RUBY","method":"flush","file":"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-lumberjack-3.1.9/lib/logstash/outputs/lumberjack.rb","line":66},{"class":"RUBY","method":"buffer_flush","file":"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/stud-0.0.23/lib/stud/buffer.rb","line":219},{"class":"org.jruby.RubyHash","method":"each","file":"org/jruby/RubyHash.java","line":1601},{"class":"RUBY","method":"buffer_flush","file":"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/stud-0.0.23/lib/stud/buffer.rb","line":216},{"class":"RUBY","method":"buffer_receive","file":"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/stud-0.0.23/lib/stud/buffer.rb","line":159},{"class":"RUBY","method":"register","file":"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-lumberjack-3.1.9/lib/logstash/outputs/lumberjack.rb","line":52},{"class":"RUBY","method":"encode","file":"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-codec-json-3.1.1/lib/logstash/codecs/json.rb","line":69},{"class":"RUBY","method":"encode","file":"/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb","line":48},{"class":"org.logstash.instrument.metrics.AbstractSimpleMetricExt","method":"time","file":"org/logstash/instrument/metrics/AbstractSimpleMetricExt.java","line":74},{"class":"org.logstash.instrument.metrics.AbstractNamespacedMetricExt","method":"time","file":"org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java","line":68},{"class":"RUBY","method":"encode","file":"/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb","line":47},{"class":"RUBY","method":"receive","file":"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-lumberjack-3.1.9/lib/logstash/outputs/lumberjack.rb","line":58},{"class":"RUBY","method":"multi_receive","file":"/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb","line":104},{"class":"org.jruby.RubyArray","method":"each","file":"org/jruby/RubyArray.java","line":1989},{"class":"RUBY","method":"multi_receive","file":"/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb","line":104},{"class":"org.logstash.config.ir.compiler.AbstractOutputDelegatorExt","method":"multi_receive","file":"org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java","line":121},{"class":"RUBY","method":"start_workers","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":304}],"exception":"Connection reset by peer","message":"(IOError) Connection reset by peer","localizedMessage":"(IOError) Connection reset by peer"},"backtrace":["org/jruby/ext/openssl/SSLSocket.java:965:in `syswrite'","/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/jls-lumberjack-0.0.26/lib/lumberjack/client.rb:107:in `send_window_size'","/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/jls-lumberjack-0.0.26/lib/lumberjack/client.rb:127:in `write_sync'","/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/jls-lumberjack-0.0.26/lib/lumberjack/client.rb:42:in `write'","/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-lumberjack-3.1.9/lib/logstash/outputs/lumberjack.rb:64:in `flush'","/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/stud-0.0.23/lib/stud/buffer.rb:219:in `block in buffer_flush'","org/jruby/RubyHash.java:1601:in `each'","/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/stud-0.0.23/lib/stud/buffer.rb:216:in `buffer_flush'","/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/stud-0.0.23/lib/stud/buffer.rb:159:in `buffer_receive'","/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-lumberjack-3.1.9/lib/logstash/outputs/lumberjack.rb:52:in `block in register'","/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-codec-json-3.1.1/lib/logstash/codecs/json.rb:69:in `encode'","/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb:48:in `block in encode'","org/logstash/instrument/metrics/AbstractSimpleMetricExt.java:74:in `time'","org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java:68:in `time'","/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb:47:in `encode'","/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-lumberjack-3.1.9/lib/logstash/outputs/lumberjack.rb:58:in `receive'","/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:104:in `block in multi_receive'","org/jruby/RubyArray.java:1989:in `each'","/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:104:in `multi_receive'","org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:121:in `multi_receive'","/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"]}}
output-pipelines-poc-recv2-1      | {"level":"INFO","loggerName":"logstash.codecs.json","timeMillis":1707365427590,"thread":"defaultEventExecutorGroup-4-1","logEvent":{"message":"ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)"}}
output-pipelines-poc-recv2-1      | {
output-pipelines-poc-recv2-1      |     "@timestamp" => 2024-02-08T04:10:27.017584396Z,
output-pipelines-poc-recv2-1      |       "sequence" => 7
output-pipelines-poc-recv2-1      | }
output-pipelines-poc-recv2-1      | {"level":"INFO","loggerName":"logstash.outputs.file","timeMillis":1707365427797,"thread":"[main]>worker0","logEvent":{"message":"Opening file","path":"/tmp/raw/2024.02/2024.02.08-04.json.gz"}}
output-pipelines-poc-recv2-1      | {"level":"INFO","loggerName":"logstash.outputs.file","timeMillis":1707365427800,"thread":"[main]>worker0","logEvent":{"message":"Creating directory","directory":"/tmp/raw/2024.02"}}
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"logstash.javapipeline","timeMillis":1707365428517,"thread":"[main]-pipeline-manager","logEvent":{"message":"Waiting for input plugin to close","pipeline_id":"main","thread":"#<Thread:0x324685a /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 run>"}}
output-pipelines-poc-recv2-1      | {
output-pipelines-poc-recv2-1      |     "@timestamp" => 2024-02-08T04:10:32.018297001Z,
output-pipelines-poc-recv2-1      |       "sequence" => 8
output-pipelines-poc-recv2-1      | }
output-pipelines-poc-recv1-1      | {"level":"INFO","loggerName":"logstash.javapipeline","timeMillis":1707365436857,"thread":"[main]-pipeline-manager","logEvent":{"message":"Pipeline terminated","pipeline.id":"main"}}
output-pipelines-poc-recv1-1      | {"level":"INFO","loggerName":"logstash.pipelinesregistry","timeMillis":1707365436978,"thread":"Converge PipelineAction::Delete<main>","logEvent":{"message":"Removed pipeline from registry successfully","pipeline_id":"main"}}
output-pipelines-poc-recv1-1      | {"level":"INFO","loggerName":"logstash.runner","timeMillis":1707365436989,"thread":"LogStash::Runner","logEvent":{"message":"Logstash shut down."}}
output-pipelines-poc-recv2-1      | {
output-pipelines-poc-recv2-1      |     "@timestamp" => 2024-02-08T04:10:37.018142972Z,
output-pipelines-poc-recv2-1      |       "sequence" => 9
output-pipelines-poc-recv2-1      | }
output-pipelines-poc-recv1-1 exited with code 0
output-pipelines-poc-recv2-1      | {
output-pipelines-poc-recv2-1      |     "@timestamp" => 2024-02-08T04:10:42.020047169Z,
output-pipelines-poc-recv2-1      |       "sequence" => 10
output-pipelines-poc-recv2-1      | }
output-pipelines-poc-recv2-1      | {
output-pipelines-poc-recv2-1      |     "@timestamp" => 2024-02-08T04:10:47.020893223Z,
output-pipelines-poc-recv2-1      |       "sequence" => 11
output-pipelines-poc-recv2-1      | }

Both outputs via output pipelines:

output-pipelines-poc-recv1-1      | {
output-pipelines-poc-recv1-1      |       "sequence" => 6,
output-pipelines-poc-recv1-1      |     "@timestamp" => 2024-02-08T03:53:08.093500216Z
output-pipelines-poc-recv1-1      | }
output-pipelines-poc-recv1-1      | {
output-pipelines-poc-recv1-1      |       "sequence" => 7,
output-pipelines-poc-recv1-1      |     "@timestamp" => 2024-02-08T03:53:13.094506620Z
output-pipelines-poc-recv1-1      | }
output-pipelines-poc-recv1-1      | {
output-pipelines-poc-recv1-1      |       "sequence" => 8,
output-pipelines-poc-recv1-1      |     "@timestamp" => 2024-02-08T03:53:18.094557528Z
output-pipelines-poc-recv1-1      | }
output-pipelines-poc-recv1-1      | {
output-pipelines-poc-recv1-1      |       "sequence" => 9,
output-pipelines-poc-recv1-1      |     "@timestamp" => 2024-02-08T03:53:23.094548073Z
output-pipelines-poc-recv1-1      | }
output-pipelines-poc-recv1-1      | {"level":"INFO","loggerName":"logstash.outputs.file","timeMillis":1707364408150,"thread":"[raw]>worker0","logEvent":{"message":"Opening file","path":"/tmp/raw/2024.02/2024.02.08-03.json.gz"}}
output-pipelines-poc-recv1-1      | {
output-pipelines-poc-recv1-1      |       "sequence" => 10,
output-pipelines-poc-recv1-1      |     "@timestamp" => 2024-02-08T03:53:28.094767419Z
output-pipelines-poc-recv1-1      | }
output-pipelines-poc-recv1-1      | {"level":"ERROR","loggerName":"logstash.javapipeline","timeMillis":1707364408153,"thread":"[raw]>worker0","logEvent":{"message":"Pipeline worker error, the pipeline will be stopped","pipeline_id":"raw","error":"(EACCES) Permission denied - /tmp/raw/2024.02/2024.02.08-03.json.gz","exception":"Java::OrgJrubyExceptions::SystemCallError","backtrace":["org.jruby.RubyIO.sysopen(org/jruby/RubyIO.java:1260)","org.jruby.RubyFile.initialize(org/jruby/RubyFile.java:364)","org.jruby.RubyClass.new(org/jruby/RubyClass.java:931)","org.jruby.RubyIO.new(org/jruby/RubyIO.java:869)","RUBY.open(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-file-4.3.0/lib/logstash/outputs/file.rb:276)","RUBY.multi_receive_encoded(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-file-4.3.0/lib/logstash/outputs/file.rb:119)","org.jruby.RubyHash.each(org/jruby/RubyHash.java:1601)","RUBY.multi_receive_encoded(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-file-4.3.0/lib/logstash/outputs/file.rb:118)","org.jruby.ext.thread.Mutex.synchronize(org/jruby/ext/thread/Mutex.java:171)","RUBY.multi_receive_encoded(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-file-4.3.0/lib/logstash/outputs/file.rb:117)","RUBY.multi_receive(/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:102)","org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:121)","RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:304)"],"thread":"#<Thread:0x51625f43 /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 sleep>"}}
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"logstash.javapipeline","timeMillis":1707364408157,"thread":"[raw]-pipeline-manager","logEvent":{"message":"Waiting for input plugin to close","pipeline_id":"raw","thread":"#<Thread:0x51625f43 /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 run>"}}
output-pipelines-poc-recv1-1      | {"level":"INFO","loggerName":"logstash.javapipeline","timeMillis":1707364410440,"thread":"[raw]-pipeline-manager","logEvent":{"message":"Pipeline terminated","pipeline.id":"raw"}}
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"org.logstash.plugins.pipeline.PipelineBus","timeMillis":1707364413153,"thread":"[recv]>worker0","logEvent":{"message":"Attempted to send event to 'p_raw' but that address was unavailable. Maybe the destination pipeline is down or stopping? Will Retry."}}
output-pipelines-poc-recv1-1      | {
output-pipelines-poc-recv1-1      |       "sequence" => 11,
output-pipelines-poc-recv1-1      |     "@timestamp" => 2024-02-08T03:53:33.101254311Z
output-pipelines-poc-recv1-1      | }
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"org.logstash.plugins.pipeline.PipelineBus","timeMillis":1707364414155,"thread":"[recv]>worker0","logEvent":{"message":"Attempted to send event to 'p_raw' but that address was unavailable. Maybe the destination pipeline is down or stopping? Will Retry."}}
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"org.logstash.plugins.pipeline.PipelineBus","timeMillis":1707364415156,"thread":"[recv]>worker0","logEvent":{"message":"Attempted to send event to 'p_raw' but that address was unavailable. Maybe the destination pipeline is down or stopping? Will Retry."}}
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"org.logstash.plugins.pipeline.PipelineBus","timeMillis":1707364416157,"thread":"[recv]>worker0","logEvent":{"message":"Attempted to send event to 'p_raw' but that address was unavailable. Maybe the destination pipeline is down or stopping? Will Retry."}}
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"org.logstash.plugins.pipeline.PipelineBus","timeMillis":1707364417159,"thread":"[recv]>worker0","logEvent":{"message":"Attempted to send event to 'p_raw' but that address was unavailable. Maybe the destination pipeline is down or stopping? Will Retry."}}
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"org.logstash.plugins.pipeline.PipelineBus","timeMillis":1707364418160,"thread":"[recv]>worker0","logEvent":{"message":"Attempted to send event to 'p_raw' but that address was unavailable. Maybe the destination pipeline is down or stopping? Will Retry."}}
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"org.logstash.plugins.pipeline.PipelineBus","timeMillis":1707364419161,"thread":"[recv]>worker0","logEvent":{"message":"Attempted to send event to 'p_raw' but that address was unavailable. Maybe the destination pipeline is down or stopping? Will Retry."}}
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"org.logstash.plugins.pipeline.PipelineBus","timeMillis":1707364420167,"thread":"[recv]>worker0","logEvent":{"message":"Attempted to send event to 'p_raw' but that address was unavailable. Maybe the destination pipeline is down or stopping? Will Retry."}}
output-pipelines-poc-recv1-1      | {"level":"WARN","loggerName":"org.logstash.plugins.pipeline.PipelineBus","timeMillis":1707364421169,"thread":"[recv]>worker0","logEvent":{"message":"Attempted to send event to 'p_raw' but that address was unavailable. Maybe the destination pipeline is down or stopping? Will Retry."}}
garethhumphriesgkc commented 9 months ago

Note also that under no situation is recovery of the failed node automatic. Once the write permissions have been removed logstash shuts down either itself or that pipeline and requires manual intervention to come back up.

yaauie commented 9 months ago

Thank you for the thoughtful and well-reduced report. I agree that the behavior is surprising, and that we should work to find a way to eliminate this failure-case.

While I don't have an immediate solution, I can add some commentary that may help us narrow down on what the behavior should be.

There are a couple things at play here:

The pipeline stopping isn't backpressure per-se (such as TCP receive-window reduction), but as the pipeline is shut down, its inputs are shut down, and most inputs that have inbound connections will simply hang up and the connecting client will handle being hung up on by retrying or rerouting.

This is interesting in the pipeline-to-pipeline case because the downstream pipeline shutting down as a result of a plugin crash does not propagate to the upstream pipeline, and therefore does not shut down the input plugin in that upstream pipeline. The upstream pipeline's inputs continue to run and receive events without hanging up, but now the upstream pipeline's outputs are blocked while they wait for the crashed pipeline to come back (it won't). Without a PQ the inputs become blocked relatively quickly, which translates to the TCP receive-window filling and TCP back-pressure which may or may not be handled by the connected client.

In a way, not propagating the crash is by-design and part of what makes several of the pipeline-to-pipeline design patterns work to do things like reload transformation or output pipeline definitions without restarting the inputs.


Currently, your GoodThing behavior relies on (a) the File input hard-crashing when it encounters a permissions issue, and (b) that crash cascading to the shutting down of the input plugin that is receiving data. If the File output doesn't crash, or if the crashing pipeline isn't the same pipeline as the one the input is running in, it will result in your BadThing behavior (which is a cascade of back-pressure that results in the input's TCP receive window filling, and the connected client handling that propagated back-pressure by blocking).

For historic reasons, pipelines that crash stay stopped until a human intervenes. The logic (at least at the time) was that a crash is by definition an extraordinary circumstance that was not planned for, and a human intervention is needed anyway. Additionally, automated restarts of a pipeline in a repeated-crashing state can cause data-loss with some inputs and is a VeryDifficult problem to solve in a generalized form, since they can cause events to be routed to a non-working pipeline that cannot process them; with the default memory queue, or with inputs that have no application-level acknowledgement scheme, this can be a bad problem.

This is certainly made more complex with the introduction and increased adoption of pipeline-to-pipeline. Again, I don't have answers, but I hope this commentary helps make sense of what is going on so that we can scheme toward a solution.

garethhumphriesgkc commented 9 months ago

Hi,

Thanks for your thoughts, I think we're on the same wavelength. It's certainly not a one-line fix, and arguably not even a bug per-se, but I felt it was worth raising.

Note that the chmod is to simulate any kind of backend failure - running out of disk space is what prompted the investigation and repro, and it feels like it's just the pipeline stopping that's the problem. I don't know of a way to gracefully stop a single pipeline, but I suspect doing so will have the same effect.

I've not yet seen failover at all in the BadThing case due to the TCP window filling. I get a warning in the log for every event, but nothing on stdout (or in the file obviously) ad infinitum. I have the pipeline size set tiny, so there shouldn't be any queuing of note within logstash:

pipeline.batch.size: 1
pipeline.workers: 1

I re-ran the repro with an event being generated every second, ran the chmod after 10 events, and after about 6 hours I got sick of waiting and manually docker killed the receiver. Immediately it was gone the other one picked up at event 21370 - so every intermediate event was lost and there was no sign of the initial receiver deciding to stop accepting. I crudely calculated that the TCP window should fill after around 15 minutes at most.

I also found during that test that two seconds worth of events (2 events at 1 per second) made it to stdout but not disk before failover happened - so it seems that any failing sub-pipeline will lose data. Granted, it's hard (impossible?) to avoid this cleanly.

There's a lot to think about here, and the actual solution may be quite involved. For my use-case I think a band-aid fix would suffice: