logstash-plugins / logstash-codec-multiline

Apache License 2.0
7 stars 31 forks source link

Infinite loop when pipeline stopping #71

Open thales-hawk opened 2 years ago

thales-hawk commented 2 years ago

Logstash information:

  1. Logstash version: Tested in 7.16, 7.17 and 8.1
  2. Logstash installation source: From tar gz
  3. How is Logstash being run: Command line
  4. How was the Logstash Plugin installed: Installed by default in the tar package

JVM (e.g. java -version): Packaged version

OS version (uname -a if on a Unix-like system): 5.13.0-39-generic #44~20.04.1-Ubuntu SMP Thu Mar 24 16:43:35 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux

Description of the problem including expected versus actual behavior:

Issue detected by using the input file. The input file seems to use the identity_map_codec from multiline codec. The problem is sometimes a pipeline never stop. I analysed the issue by adding some logs and I notice that the pipeline is trapped in an infinite loop. This loop is located in the identity_map_codec.rb file in the stop method of the PeriodicRunner class:

def stop
      return if !running?
      @running.make_false
      while @thread.alive?
        begin
          @thread.wakeup
        rescue ThreadError
          # thread might drop dead since the alive? check
        else
          puts "Before join"
          @thread.join(0.1) # raises $! if there was any
          puts "After join"
        end
      end
      @listener = nil
    end

The cause of the loop is the @running variable which is never changed to false. Unfortunately, I don't understand why the boolean stay to true :-(

Steps to reproduce:

This issue is random and so tricky to reproduce. I just create a pipeline in logstash with the file input and remove it from the pipeline.yml config file.

thales-hawk commented 2 years ago

I found a way to reproduce and fix the issue !

The problem is a concurrent access to the running boolean in the PeriodicRunner. When we stop the PeriodicRunner the boolean change to false but if a new log is going through the codec, the PeriodicRunner is started again.

Steps to reproduce:

Change the stop method like this:

def stop
      return if !running?
      @running.make_false
      sleep(1) # We add this sleep
      # During this sleep the start method of the PeriodicRunner is called
      while @thread.alive?
        begin
          @thread.wakeup
        rescue ThreadError
          # thread might drop dead since the alive? check
        else
          puts "Before join"
          @thread.join(0.1) # raises $! if there was any
          puts "After join"
        end
      end
      @listener = nil
    end

How to fix this ?

Just add a stopped attribute to the PeriodicRunner to indicate if the PeriodicRunner was stopped and has not to be restarted.

This is an example of the PeriodicRunner class:

class PeriodicRunner
  def initialize(listener, interval, method_symbol)
    @listener, @interval = listener, interval
    @method_symbol = method_symbol
    @running = Concurrent::AtomicBoolean.new(false)
    @stopped = Concurrent::AtomicBoolean.new(false)
  end

  def start
    return self if running? || stopped?
    @running.make_true
    @thread = Thread.start do
      class_name = @listener.class.name.split('::').last # IdentityMapCodec
      LogStash::Util.set_thread_name("#{class_name}##{@method_symbol}")

      while running? do
        sleep @interval
        break if !running?
        break if (listener = @listener).nil?
        listener.send(@method_symbol)
      end
    end
    self
  end

  def running?
    @running.true?
  end

  def stopped?
    @stopped.true?
  end

  def stop
    return if !running? || stopped?

    @stopped.make_true
    @running.make_false
    while @thread.alive?
      begin
        @thread.wakeup
      rescue ThreadError
        # thread might drop dead since the alive? check
      else
        @thread.join(0.1) # raises $! if there was any
      end
    end
    @listener = nil
  end
end