logstash-plugins / logstash-filter-grok

Grok plugin to parse unstructured (log) data into something structured.
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
Apache License 2.0
122 stars 97 forks source link

Fix a possible race where Thread.interrupted was not properly cleared #131

Closed andrewvc closed 6 years ago

andrewvc commented 6 years ago

This should keep perf close enough. It's a little slower, but worth it for the consistency.

Old perf as measured with time: 54.57 real 216.58 user 3.51 sys New perf as measured with time: 59.58 real 238.70 user 4.21 sys

Test config:

# MULTIPLE grok filters
input {
    generator {
        type => foo
        message => "random message, la la la"
        count => 1000000
    }
}

filter {
    grok {
        match => {
            "message" => "^1 foo 1 bar$"
        }
    }
    grok {
        match => {
            "message" => "^2 foo 2 bar$"
        }
    }
    grok {
        match => {
            "message" => "^3 foo 3 bar$"
        }
    }
    grok {
        match => {
            "message" => "^4 foo 4 bar$"
        }
    }
    grok {
        match => {
            "message" => "^5 foo 5 bar$"
        }
    }
    grok {
        match => {
            "message" => "^6 foo 6 bar$"
        }
    }
    grok {
        match => {
            "message" => "^7 foo 7 bar$"
        }
    }
    grok {
        match => {
            "message" => "^8 foo 8 bar$"
        }
    }
    grok {
        match => {
            "message" => "^9 foo 9 bar$"
        }
    }
    grok {
        match => {
            "message" => "^10 foo 10 bar$"
        }
    }
    grok {
        match => {
            "message" => "^11 foo 11 bar$"
        }
    }
    grok {
        match => {
            "message" => "^12 foo 12 bar$"
        }
    }
    grok {
        match => {
            "message" => "^13 foo 13 bar$"
        }
    }
    grok {
        match => {
            "message" => "^14 foo 14 bar$"
        }
    }
    grok {
        match => {
            "message" => "^15 foo 15 bar$"
        }
    }
    grok {
        match => {
            "message" => "^16 foo 16 bar$"
        }
    }
    grok {
        match => {
            "message" => "^17 foo 17 bar$"
        }
    }
    grok {
        match => {
            "message" => "^18 foo 18 bar$"
        }
    }
    grok {
        match => {
            "message" => "^19 foo 19 bar$"
        }
    }
    grok {
        match => {
            "message" => "^20 foo 20 bar$"
        }
    }

    metrics {
        meter => "events"
        add_tag => "metric"
    }
}

output {
    if "metric" in [tags] {
        stdout {
            codec => line {
                format => "rate_1m: %{[events][rate_1m]}, rate_5m: %{[events][rate_5m]}"
            }
        }
    }
}
colinsurprenant commented 6 years ago

Left some minor notes, the thread interruption handling logic seems good.

colinsurprenant commented 6 years ago

nit: I noticed that @cancel_mutex is useless now, we could remove it.

colinsurprenant commented 6 years ago

@andrewvc @jordansissel following up on our conversation about ConcurrentHashMap forEach behaviour in this situation:

    @threads_to_start_time.forEach do |thread, start_time|
      @threads_to_start_time.compute(thread) do |thread, start_time|
      ...
      end
    end

From what I can read in https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly

Most concurrent Collection implementations (including most Queues) also differ from the usual java.util conventions in that their Iterators and Spliterators provide weakly consistent rather than fast-fail traversal:

they may proceed concurrently with other operations
they will never throw ConcurrentModificationException
they are guaranteed to traverse elements as they existed upon construction exactly once, and may (but are not guaranteed to) reflect any modifications subsequent to construction.

So I am not sure about the behaviour of forEach but I would suspect it is also weakly consistent so I think instead of using @threads_to_start_time.compute we could simply use @threads_to_start_time.computeIfPresent and that would account for the possibility of having the thread removed from the collection while in the forEach loop.

andrewvc commented 6 years ago

@colinsurprenant makes sense to move to computeIfPresent, I'll make that improvement.

andrewvc commented 6 years ago

I changed this in a few ways:

  1. Inlined start_thread_grokking method for simplicity (it was called in one spot)
  2. Removed the warning for non grok interrupted errors, we'll just report those as regexp interrupted errors, this is a boundary condition that occurs only when regexps take too long
  3. Switched from compute to computeIfPresent
  4. Removed an overly conservative call to thread.interrupted before we start grokking. This is unnecessary
colinsurprenant commented 6 years ago

@andrewvc nit: just noticed that @cancel_mutex is still defined but unused https://github.com/logstash-plugins/logstash-filter-grok/blob/eb88860b88beb999e4c2ab329fbe89bb8026e020/lib/logstash/filters/grok/timeout_enforcer.rb#L12

andrewvc commented 6 years ago

@colinsurprenant just removed that extra line, good catch!

colinsurprenant commented 6 years ago

Another observation: this is not part of the change set but might be a good idea to modify: the @running bool is used across threads to control termination, I'd suggest we make it an AtomicBoolean to make it explicitly threadsafe. https://github.com/logstash-plugins/logstash-filter-grok/blob/eb88860b88beb999e4c2ab329fbe89bb8026e020/lib/logstash/filters/grok/timeout_enforcer.rb#L31

colinsurprenant commented 6 years ago

@andrewvc ^^ something like https://github.com/elastic/logstash/blob/a0aa92980e74ec9ea00a058fccf648d60c9482fa/logstash-core/lib/logstash/inputs/base.rb#L61

colinsurprenant commented 6 years ago

@andrewvc I leave it to you to decide for @running since I believe this will not have practical impact. LGTM!
Really good job on this!!

andrewvc commented 6 years ago

@colinsurprenant moved @running to an atomic boolean, good catch. Apparently it didn't cause a problem before, but it definitely wasn't right.

elasticsearch-bot commented 6 years ago

Andrew Cholakian merged this into the following branches!

Branch Commits
master 82ec77961a497b0e1066af08fadbe6b6f84146c7
PhaedrusTheGreek commented 6 years ago

@andrewvc , Is this fix available in LS 5.5.2?

Seems like I have only 3.4.4, and If I understand correctly, this fix is in 4.0.1 ?

$  bin/logstash-plugin update logstash-filter-grok
Updating logstash-filter-grok
Updated logstash-filter-grok 3.4.2 to 3.4.4
PhaedrusTheGreek commented 6 years ago

Just confirmed I was able to upgrade in LS 5.6.3

$ bin/logstash-plugin update logstash-filter-grok
Updating logstash-filter-grok
Updated logstash-filter-grok 4.0.0 to 4.0.1