elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
69 stars 3.5k forks source link

Routing events to specific workers #6572

Open consulthys opened 7 years ago

consulthys commented 7 years ago

In case Logstash runs with multiple pipeline workers, events are handed out to each worker in turn as they get available. Certain plugins (e.g. such as aggregate but also a few others doing some in-memory caching) require the user to set a single worker (-w 1) in order for the events not to be processed out of sequence. It's sad to have 2+ CPUs and only be able to use a single one just because of this limitation.

I'm wondering if we could potentially avoid this by introducing a way to route events to a specific worker, very much like how the routing parameter works in ES in regard to routing documents to specific shards or the partitioning key in Kafka. The idea would be to introduce a base-level routing parameter that we could add to any inputs and whose (hashed) value would be taken into account in order for Logstash to know to which worker the event should be routed to (worker_id = hash(routing) % pipeline.workers). This value could be static or could come from some field value of the event. In this latter case, this would also imply that the field needs to be available before the filter phase, which might not be the case for inputs that produce plain events by default, but that could be circumvented by using the json codec instead.

Let's consider a file containing JSON logs such as

{"timestamp": "2017-01-19T10:32:14.123Z", "deviceId": 123, "message": "Device has been powered on"}
{"timestamp": "2017-01-19T10:32:17.234Z", "deviceId": 123, "message": "Button 1 pressed"}
{"timestamp": "2017-01-19T10:34:19.432Z", "deviceId": 123, "message": "Button 2 pressed"}
{"timestamp": "2017-01-19T10:36:02.583Z", "deviceId": 456, "message": "Device has been powered on"}
{"timestamp": "2017-01-19T10:37:12.927Z", "deviceId": 456, "message": "Button 1 pressed"}
{"timestamp": "2017-01-19T10:38:33.583Z", "deviceId": 456, "message": "Button 2 pressed"}

A file input consuming those logs could be made "worker-aware" by routing the events based on the value of the deviceId field like this

input {
  file {
    path => "/path/to/file.json"
    codec => "json"
    routing => "%{deviceId}"
  }
}

From this point on, we can ensure that all events of device 123 will always be handled by the same worker thread and the same goes for device 456.

This change could allow users

  1. to not have to worry about whether they have to use a single worker thread or not
  2. to leverage the full power of their server (i.e. all CPUs) if they wish to
  3. to use worker-local in-memory caches (e.g. logstash-filter-cache)

I saw that it will soon be possible to define several pipelines inside a single Logstash instance, but that won't solve the issue I'm pointing at here since worker threads are internal to each defined pipeline.

Does anyone have any thoughts on this? Is there a similar effort going on internally?

Note: This was originally posted in the discuss forum

consulthys commented 7 years ago

@colinsurprenant do you have any thoughts on this?

imweijh commented 7 years ago

If define multiple pipeline, but each pipeline with only one workers, does this model solve the issue ? For example: 2 CPUs, so define 2 pipelines, each with one worker on aggregate . @consulthys

Moldisocks commented 1 year ago

Any progress on this enhancement here or tracked elsewhere?

I too would be interested in optimising log aggregation performance for my pipeline.

This seems like a good approach; routing events to a particular worker for a single pipeline by some key term. For me I'd split it by submitting host.