elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
14.09k stars 3.48k forks source link

PipelineBus.sendEvents locks on the sender PipelineOutput impacting overall throughput #16171

Open jsvd opened 1 month ago

jsvd commented 1 month ago

Currently (up to 8.14.0 at the moment of writing), the PipelineBus class has a lock on the sender output:

    public void sendEvents(final PipelineOutput sender,
                           final Collection<JrubyEventExtLibrary.RubyEvent> events,
                           final boolean ensureDelivery) {
        if (events.isEmpty()) return; // This can happen on pipeline shutdown or in some other situations

        synchronized (sender) {
// .....
            addressesToInputs.forEach((address, addressState) -> {
// .....
                    PipelineInput input = addressState.getInput(); // Save on calls to getInput since it's volatile
// .....
                        lastResponse = input.internalReceive(clones);

internalReceive will call Queue.write that mainly does 3 steps:

    public long write(Queueable element) throws IOException {
//...
        byte[] data = element.serialize();
        lock.lock();
        this.headPage.write( .. );

This means that when there are multiple writers to the same pipeline and serialization + pagewrite take a long time, most threads will spend time waiting for 1 thread that is writing an event, which can be seen with the simple pipelines.yml:

- pipeline.id: source
  config.string: "input { java_generator {} } output { pipeline { send_to => [dest1] } }"
- pipeline.id: dest
  config.string: "input { pipeline { address => dest1 } } output { null {} }"

And queue.type: persisted in the logstash.yml. This will cause all but one of the workers of the upstream pipeline to be blocked at any given time:

❯ curl -s -XGET 'localhost:9600/_node/hot_threads?human=true&threads=30&stacktrace_size=10' | grep "thread name.*source.*worker" -A 1 | grep -v "\-\-"
12.17 % of cpu usage, state: blocked, thread name: '[source]>worker2', thread id: 54 
    app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
12.12 % of cpu usage, state: blocked, thread name: '[source]>worker5', thread id: 60 
    app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.96 % of cpu usage, state: blocked, thread name: '[source]>worker0', thread id: 51 
    app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.93 % of cpu usage, state: blocked, thread name: '[source]>worker4', thread id: 58 
    app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.89 % of cpu usage, state: runnable, thread name: '[source]>worker3', thread id: 56 
    java.base@17.0.10/jdk.internal.misc.Unsafe.unpark(Native Method)
11.84 % of cpu usage, state: blocked, thread name: '[source]>worker7', thread id: 63 
    app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.82 % of cpu usage, state: blocked, thread name: '[source]>worker6', thread id: 62 
    app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.66 % of cpu usage, state: blocked, thread name: '[source]>worker9', thread id: 68 
    app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.57 % of cpu usage, state: blocked, thread name: '[source]>worker1', thread id: 52 
    app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.48 % of cpu usage, state: blocked, thread name: '[source]>worker8', thread id: 65 
    app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)

This was introduced by https://github.com/elastic/logstash/pull/10872 to ensure proper order during pipeline shutdown. However it should be possible to improve concurrency by having a readwritelock that allows read access to the sender object during event processing, but uses the write lock for every other operation.

jsvd commented 1 month ago

Impact on 7.2.0 -> 7.2.1 can be easily observed. For the following pipelines.yaml:

- pipeline.id: source
  config.string: "input { generator { count => 20000000 } } output { pipeline { send_to => destination } }"
- pipeline.id: destination
  queue.type: persisted
  config.string: "input { pipeline { address => destination } } output { null {} }"

we can observe the following numbers:

7.2.0 - 1m 46s

❯ curl -s localhost:9600/_node/stats/pipelines/destination | jq .pipelines.destination.events
{
  "duration_in_millis": 771,
  "filtered": 20000000,
  "in": 20000000,
  "out": 20000000,
  "queue_push_duration_in_millis": 90072
}

/tmp/logstash-7.2.1
❯ curl -s localhost:9600/_node/stats/pipelines/source | jq .pipelines.source.events          
{
  "duration_in_millis": 866418,
  "filtered": 20000000,
  "in": 20000000,
  "out": 20000000,
  "queue_push_duration_in_millis": 7082
}

7.2.1 - 2m 38s

❯ curl -s localhost:9600/_node/stats/pipelines/destination | jq .pipelines.destination.events
{
  "duration_in_millis": 565,
  "filtered": 20000000,
  "in": 20000000,
  "out": 20000000
  "queue_push_duration_in_millis": 3720,
}

/tmp/logstash-7.2.0
❯ curl -s localhost:9600/_node/stats/pipelines/source | jq .pipelines.source.events          
{
  "duration_in_millis": 1354787,
  "filtered": 20000000,
  "in": 20000000,
  "out": 20000000
  "queue_push_duration_in_millis": 12592,
}