elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
14.17k stars 3.49k forks source link

Support multiple separate pipelines in a single logstash instance #6521

Closed jsvd closed 7 years ago

jsvd commented 7 years ago

Currently logstash supports running only a single instance of a pipeline, but it would be useful to run multiple pipelines.

Supporting multiple pipelines has several benefits:

The simplest way to implement and use this feature is to add a toggle feature that, when enabled, reads from the config/logstash.yml a list of pipelines This feature will have logstash read a pipelines.yml file where pipelines can be described individually. Example:

# pipelines.yml
- pipeline.id: line
  config.string: "input { generator {} } filter { sleep { time => 0.01 } } output { stdout { codec => line { format => '%{sequence}' } } }"
  pipeline.workers: 1
- pipeline.id: line2
  path.config: "/etc/conf.d/logstash/line2.*.cfg"
  pipeline.workers: 5

TODO:

Check the feature branch feature/multi_pipeline or pr #6525 for progress

ph commented 7 years ago

I've been working on #6514 and I would like to suggest a few things for the multiple pipelines, theses comme from the observation of the current code base and also to deal with future features.

I've been working on two features that resolve around how we load our configuration and how we parse it.

These changes resolve around a new class called SourceLoaderFactory, this class uses the current logstash settings to create a wrapper around the possible logstash config. After we can get the available pipeline configuration by calling #pipeline_configs on the instance. This method will return an array of PipelineConfig, each instance will have the following:

In my branch I only take the first config, but for multiple pipeline/reload context, I think we would need the following changes.

[
    ShutdownPipelineCommand.new(:pipeline_id_XX),
    ReloadPipelineCommand.new(:main, a_reload_pipeline_config),
    StartPipelineCommand.new(new_pipeline, new_pipeline_config)
] # maybe sorted on priority of operations
hzruandd commented 7 years ago

i look at this feature with expectant eyes.

oridag commented 7 years ago

Correct me if I'm wrong but this implementation will allow for creating several completely segregated pipelines. If so, it will not address a very popular use case of allowing some outputs to fail/slowdown while maintaining regular flow for other outputs. What would be helpful would be to have the ability to send events from one input to multiple pipelines, so back pressure from one pipeline will not affect the other.

jsvd commented 7 years ago

@oridag correct, the multi pipeline just provides "complete" separation between different pipelines. Implementation-wise, it is a low hanging fruit that just allows the user to have more than 1 pipelines. Having separate flows within the same pipeline requires a lot more ground work, that is being tracked in https://github.com/elastic/logstash/issues/6018. This new pipeline runtime framework will allow the pipeline to essentially become a DAG, and once that's in place, we'll be able to gradually provide ways for the user to assemble a more complex pipeline event flow. Another consequence of this design is that it also requires the in memory or persisted queue to provide different event subscription models depending on the desired guarantees. If a user doesn't want back pressure to be global, there will be less delivery guarantees for those suffering from back pressure.

I should note that it will be possible to have pipeline to pipeline data transfer using existing plugins (tcp output -> tcp output or lumberjack output -> beats input) once multi pipeline comes

oridag commented 7 years ago

@jsvd thanks for confirming that. I'm not familiar with the code, but I'm wondering how difficult would it be to broaden the scope if this issue a bit and allow for configuring a list of pipeline ids for each input. If I understand correctly, events are queued after input stage and before filter stage so perhaps that's not such a high hanging fruit too. I may be completely off as I really know next to nothing about LS architecture.

Regarding your last paragraph: yes, I understand that that would be possible but it will not help with the use case of a dead output since the filling up of the second pipeline will also block the first pipeline,

FlorinAndrei commented 7 years ago

I should note that it will be possible to have pipeline to pipeline data transfer using existing plugins (tcp output -> tcp output or lumberjack output -> beats input) once multi pipeline comes

Wouldn't that still block?

Let's say you have 2 big pipelines, with separate outputs, and you want to keep them separate so the blocking of one does not block the other.

Then you have a very small 3rd pipeline, basically just copying the same events to the inputs of the 2 big pipelines. But if one of the 2 big pipelines is blocked, its input would block, and that would propagate upstream to the 3rd pipeline.

Or am I missing something?

jsvd commented 7 years ago

@FlorinAndrei totally correct, @oridag also pointed that out, this wouldn't fix a 1 pipeline -> 2 pipeline scenario. Enabling PQs here would allow one pipeline to tolerate some hiccups by buffering events, not impacting the other pipeline until the pq capacity thresholds are met.

oridag commented 7 years ago

@jsvd @FlorinAndrei I'm planning on solving this (when multiple pipelines is released, 5.3?) by using a custom plugin that ships events from one pipeline to the next, having a small buffer, and throwing away events in case it is full. It would be nice to have that as an option in current plugins (beats, lumberjack). I tried to implement it right now with 2 Logstash instances but it was too much of a resource overhead.

jsvd commented 7 years ago

@oridag interesting, have you tried using lumberjack output -> beats input and setting a persistent queue with a small capacity (something like 1000 max events, or 1mb of max size)?

oridag commented 7 years ago

@jsvd I didn't. What would happen when the queue fills up? It would sill halt the source pipeline wouldn't it? I'm trying to avoid that.

jsvd commented 7 years ago

@oridag sure, sure, I was just curious..the pq would only mitigate blockings for a short time. using UDP input/output would cause events to drop, but maybe more than you want :D

oridag commented 7 years ago

@jsvd Actually, UDP might be good enough for my use case. But still requires multiple pipelines as running two LS instances takes too much resources. What's the ETA on multiple pipelines?

FlorinAndrei commented 7 years ago

Eventually some kind of "universal lossy buffer" should be included in Logstash as an optional filter plugin (buffer events to some extent when downstream is clogged, then start dropping events until it gets unclogged). It would be very useful with multiple pipeline scenarios when not all outputs have critical data integrity requirements. Then you won't have to reinvent this concept for every input or output plugin out there.

ph commented 7 years ago

@jsvd

Concerning the format of yaml entries, should we use the pipeline.id as a key for a pipeline configuration? This will solve the duplicate id problem at the format level, if not we can always provide the feedback at the source?


pipelines:
- line: 
     config:
       string: "input { generator {} } filter { sleep { time => 0.01 } } output { stdout { codec => line { format => '%{sequence}' } } }"
     pipeline:
       workers: 1
- line2:
    path:
      config: "/etc/conf.d/logstash/line2.*.cfg"
    pipeline:
      workers: 5
jsvd commented 7 years ago

implemented by #6525, this will very likely appear in 6.0.0-alpha2