elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
68 stars 3.5k forks source link

Graph Pipeline Design #4765

Open andrewvc opened 8 years ago

andrewvc commented 8 years ago

I found some time while in Lake tahoe to experiment (https://github.com/elastic/logstash/pull/4727) with what I'm terming a 'graph' pipeline execution model. The critical ideas behind this design are as follows:

  1. Improve performance: I've seen gains ~27% in the apache example
  2. Reduce complexity: Cross-compiling to Ruby is a source of complexity as anyone who has read config_ast can attest.
  3. Move us closer to a pure Java core/java plugins: Implementing this in pure java is easy, we don't rely on the JRuby complier at all.
  4. Enable more filter operations by allowing filters to operate on batches instead of single events: The wins here are significant for filters that perform IO. Even for ones that do batch functionality can be exploited. The GeoIP and UserAgent filters would be able to group their cache lookups for instance. For critical sections of code the granularity of mutexes is easier to manage as well and should require fewer context switches.
  5. Have an easy to work with IR for Logstash configs: The benefits for testing and debugging are huge.
  6. Pave the way for generated configs via UI: By using a graph format it's much easier to visually format and export Logstash configs. Initially this would be an internal IR generated by @colinsurprenant 's ANTLR grammar (https://github.com/colinsurprenant/logstash-antlr-config).
  7. Ease debugging and tracing: Visualizing a graph is simple in a UI, it's also easy to show hotspots in one. It's also easy to enable tracing plugins that modify the graph such that extra stats are recorded.

    Implementation

The current model compiles the Logstash config to ruby code, then repeatedly executes it. An abbreviated version of the execution is what is shown below

queue = LogStash::Queue.new
start_input_threads(queue)
worker_threads.times do
  Thread.new do
    loop do
      batch = take_batch(queue)
      # Events are filtered one at a time
      filtered = batch.map {|e| filter_func(e)}
      outputs_to_events = map_outputs_to_events(filtered)
      outputs_to_events.each {|output, events| output.multi_receive(events)}
    end
end

Note that the entire filter chain for a single event is compiled into a single function, filter_func. This is also true of output_func, though in that case it returns which outputs the event should be sent to rather than directly executing it.

The graph model is more simplistic. We model the entire pipeline as a graph (from inputs, to queues, to filters, to outputs. The IR serialized to yaml might look like the following:


---
graph:
  log-reader:
    component: input-stdin
    to: [main-queue]
  main-queue:
    component: queue-synchronous
    to: [apache-grok]
  apache-grok:
    component: filter-grok
    options:
      match:
        message: '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
    to: [code-splitter]
  code-splitter:
    component: predicate-ifelse-ruby
    to:
      - ["value = event['[response]'] && value ? value > 399 : false", [error-tagger]]
      - ["__ELSE__", [apache-geoip]]
  error-tagger:
    component: filter-mutate
    options:
      add_tag:
        fb: weird_error
    to: [apache-geoip]
  apache-geoip:
    component: filter-geoip
    options:
      source: geoip
      target: geoip
    to: [apache-ua]
  apache-ua:
    component: filter-useragent
    options:
      source: agent
      target: useragent
    to: [apache-date]
  apache-date:
    component: filter-date
    options:
      match: [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
      locale: en
    to: [main-out]
  main-out:
    component: output-file
    options:
      path: /tmp/graph-out.json
      codec: json_lines

With the graph expressed as such we can execute according to any strategy of our choice.

// Pseudo-java-code
Graph graph = GraphConfig.loadFile("myconfig")

// For now we just support one queue, but easy to change!
Vertex queue = graph.getById("main-queue")

// Parse out inputs for execution in a separate thread
List<Vertex> inputs = graph.inputs() 
startInputs(inputs, queue)

// Get the subgraph of nodes underneath the main queue
Graph pipelineWorkerGraph = graph.getById("main-queue").subGraph()
// Sort the vertices topologically to optimize execution order (and check for cycles in the DAG)
// This will mean we deliver the maximum batch size to each Vertex
List<Vertex> executionOrderedVertices = pipelineWorkerGraph.topologicalSort()

for (i =0; i < workerCount; i++) {
  new Thread(new Runnable() {
    @Override
    public void run() throws Exception {
      while (true) {
        Map<Edge, List<Event>> edgeEvents = new HashMap<Edge, List<Events>>;        
        // There are some interesting opportunities beyond the toposort to optimize
        // traversal here for empty boolean paths
        for (Vertex v in executionOrderedVertices) {
          // Process the vertex, returning a list of results mapped to edges
          // pass in the current batche's edgeEvents map so it can pull from
          // previously calculated edge values.
          Map<Edge, List<Event>> vEdgeEvents = v.execute(edgeEvents)
          // Merge the result map into the list of all event results
          vEdgeEvents.stream().putAll(vEdgeEvents)
        }
      }
    }
  });
}

Why is it Faster?

I'm not sure. I know that implementing a similar graph pattern in ruby yielded similar results. My money is on some inefficiency in the generated ruby OR the greater CPU cache locality afforded by filtering batches. Either way I believe regardless of performance the other benefits stack.

andrewvc commented 8 years ago

Updated benchmarks for a simple (generator -> kv -> mutate -> stdout(dots)) pipeline.2.13x faster. This shows that the gains for simpler pipelines are larger (as expected).

~/p/logstash (native-graph) $ time bin/logstash -w 2 -f simple-kv.yml | pv -b | wc -c
      137.29 real       300.15 user        47.45 sys
9.54MiB
 10000107

~/p/logstash-alt (fix_oops_backtrace_logging) $
time bin/logstash -w 2 -f simplekv.conf  | pv -b | wc -c
      292.70 real       647.86 user        53.70 sys
9.54MiB
 10000107

Config files available here

purbon commented 8 years ago

After giving it some though, I think I like the idea behind of this change, it also compliments perfect with the component idea stated in #4432. But I do really think we should not hurry too much into adding this change, I will try to explain my thoughts later on.

Benefits / Pros

Open questions / Cons

For now I think I would focus us on building the most flexible IR DAG model possible, then performance will come out of the box for sure. What do you think?

guyboertje commented 8 years ago

@andrewvc and I have discussed the connectedness of nodes (components) with the DAG.

Background: As I have a strong electronics background, I have a mental model of components "wired up" to each each other so that the pipeline does not orchestrate the collection of data from an upstream component and feed it to a downstream component - rather the upstream component knows its downstream connections and feeds data downstream autonomously. However the pipeline does have to build the connected DAG. Here the DAG is more than a data structure - its an execution structure.

Andrew on the other hand, is more comfortable with the idea of a supervisory pipeline that does orchestrate as above. In this design, components are still specific wrappers that present a common communication interface and hide the API of the underlying "unit of work" that it wraps.

Conclusion: I have conceded that while the "hardware" notion of a connected DAG has some appeal - it is hard to construct, inspect and reason about at runtime. Therefore I am fully throwing my weight behind the idea of a DAG as a data structure with the pipeline orchestrating the flow of data through the graph.

We did agree that the nodes of the DAG would contain filters and output components but also could contain other components like for example metrics and trace logging components. We also agreed that the stitching in of components on a running instance of LS could be done dynamically.

andrewvc commented 8 years ago

Thanks for the comments @purbon and @guyboertje ! I agree completely with all your concerns @purbon all the code/config language I've put in so far is experimental and meant to be changed and discussed!

I should have included, however, that this graph language might be a preferable config language for administrators in a textual form as well however!