Quick and dirty reproducer on trying to close the identity_map_codec (with running cleaner thread):
require 'logstash-core'
require 'logstash/codecs/plain'
require 'logstash/codecs/identity_map_codec'
def start_codec(identity: Object.new)
codec = LogStash::Codecs::Plain.new(Hash.new)
codec = LogStash::Codecs::IdentityMapCodec.new(codec)
codec.close # file input does it this way - closes the codec before use
codec.send :stream_codec, identity # cleaner and auto-flusher threads start again on first use
codec
end
def start_threads!
100.times.map do |i|
Thread.start do
Thread.current.name = "t-#{i}"
begin
codec = start_codec
Thread.pass
codec.cleaner.stop # a more direct codec.close
rescue => e
puts "ERROR: #{e.inspect}\n #{e.backtrace.join("\n ")}"
end
end
end
end
start = Time.now
iteration = 0
loop do
iteration += 1
puts "Elapsed time: #{Time.now - start}, iteration: #{iteration}"
threads = start_threads!
sleep(1.0)
threads.each do |thread|
if thread.alive?
sleep(5)
if thread.alive?
puts "Elapsed time: #{Time.now - start}"
puts "LOCKED THREAD FOUND: #{thread.name} #{thread.inspect}"
sleep(60)
if thread.alive?
puts "BUG: thread (still) alive after 60seconds"
exit(1)
end
end
end
end
end
Quick and dirty reproducer on trying to close the
identity_map_codec
(with running cleaner thread):more real-world details at: https://github.com/logstash-plugins/logstash-input-file/issues/298