elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
14.21k stars 3.5k forks source link

New queue type to configurably drop events to avoid upstream blocking from downstream back pressure #11601

Open geekpete opened 4 years ago

geekpete commented 4 years ago

We already have Persistent Queue but this queue type will still block if it's full.

Another queue type for the purpose of avoiding back pressure in exchange for discarding events would be useful for a number of use cases.

It'd probably be better to implement a new queue type to keep the focus simple rather than try to extend PQ for this purpose, though do see the related issue around PQ included at the end of this issue.

Use cases that could benefit from this queue type might include the patterns described in pipeline-to-pipeline docs:

Real world examples of some patterns might be:

Configuration that might be nice to have:

There's potentially lots of functionality that might be built that dedicated message brokers already do and more, but starting simpler with the three functions of being able to discard events when the queue hits full from either:

would cover a lot of cases to begin with without going over the top on functionality for the first iteration of this feature.

A related issue:

aalleexxx5 commented 3 years ago

I needed to remove backpressure, but didn't find an elegant solution. I wanted to leave my current ~hack~ solution here, in case someone else has the same issue. It uses the stats api in a pipeline to check the queue size for each event and then drops events if the queue is above a threshold.

The Logstash stats API only updates once every 5 seconds, which means that the pipeline manages to completely empty its queue before the size is updated, and that I need a very healthy margin for the queue size threshold.

How to empty a queue based on event count:

input {
    pipeline {
        address => cloudQueue
    }
}

filter {
    http {
        url => "http://localhost:9600/_node/stats/pipelines/cloud_queue"
        target_body => "[@metadata][stats]"
        target_headers => "[@metadata][statsHeaders]"
        add_field => {"queueSize" => "%{[@metadata][stats][pipelines][cloud_queue][queue][events]}" }
    }
    mutate {
        convert => {
            "queueSize" => "integer"
        }
    }
}
output {

# Queue size limit
# Assuming 2000 bytes per event, 644245 events takes up 1.2GiB.
# We need a healthy margin, since the 2000 bytes is approximate and update rate of 5 seconds.

    if [queueSize] and [queueSize] < 644245 {
        pipeline {
            ensure_delivery => true
            send_to => [cloudConnect]
        }
    }else{
#         Used to view the dropped events.
#         stdout {
#            codec => "rubydebug"
#        }
    }
}
SelAnt commented 2 years ago

Hi, Slightly improved version of aalleexxx5 solution - use Ruby to query Queue size every... 10 seconds (rather for every event). It will stop shipping events if 'sandbox' queue will have > 1000 events pipelines.yml

filter{
  ruby {
    init => "require 'uri';require 'net/http';require 'json';$sandbox_queue_size=-1;Thread.new do while true do uri = URI('http://localhost:9600/_node/stats/pipelines');res = Net::HTTP.get_response(uri);if res.is_a?(Net::HTTPSuccess) then json = JSON.parse(res.body); $sandbox_queue_size = json['pipelines']['sandbox'] ? json['pipelines']['sandbox']['queue']['events_count'] : -1; else $sandbox_queue_size = -1; end; if $sandbox_queue_size && $sandbox_queue_size<0 then puts 'Cannot get Sandbox queue size'; end; sleep 10 end end"
    code => 'event.set("sandbox_queue_size", $sandbox_queue_size)'
  }
}
output {
  if [sandbox_queue_size] and [sandbox_queue_size] >= 0 and [sandbox_queue_size] < 1000 {
    pipeline {
      send_to => [sandbox]
    }
  }
}