logstash-plugins / logstash-input-beats

Apache License 2.0
87 stars 81 forks source link

Andsel/verify oom behavior of parser in isolation #477

Open andsel opened 1 year ago

andsel commented 1 year ago

Use log4j2.properties like

status = error

appender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n

logger.netty.name = io.netty
logger.netty.level = debug
logger.beats_parser.name = org.logstash.beats.BeatsParser
logger.beats_parser.level = trace
logger.beats_server.name = org.logstash.beats.Server
logger.beats_server.level = warn

Use a ruby script to load, in plain TCP like:

# encoding: utf-8
# JRUBY_OPTS="-J-Xmx4g -J-Xms4g" ruby beats_writer_ssl.rb
require "socket"
require "thread"
require "zlib"
require "json"
require "openssl"

Thread.abort_on_exception = true
HOST="127.0.0.1"
PORT=3333
CLIENT_CERT="/Users/andrea/workspace/certificates/client_from_root.crt"
CLIENT_KEY="/Users/andrea/workspace/certificates/client_from_root.key.pkcs8"

module Lumberjack
  SEQUENCE_MAX = (2**32-1).freeze

  class Client
    def initialize
      @sequence = 0
      #@socket = connect
      @socket = TCPSocket.new(HOST, 3334)
    end

    private
    def connect
      socket = TCPSocket.new(HOST, PORT)
      ctx = OpenSSL::SSL::SSLContext.new
      ctx.cert = OpenSSL::X509::Certificate.new(File.read(CLIENT_CERT))
      ctx.key = OpenSSL::PKey::RSA.new(File.read(CLIENT_KEY))
      ctx.ssl_version = :TLSv1_2

      # Wrap the socket with SSL/TLS
      ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ctx)
      ssl_socket.sync_close = true
      ssl_socket.connect
      ssl_socket
    end

    public
    def write(elements, chunk_size=5000, sleep_seconds=0.01)
      elements = [elements] if elements.is_a?(Hash)
      send_window_size(elements.size)

      payload = elements.map { |element| JsonEncoder.to_frame(element, inc) }.join
      send_payload(payload, chunk_size, sleep_seconds)
    end

    private
    def inc
      @sequence = 0 if @sequence + 1 > Lumberjack::SEQUENCE_MAX
      @sequence = @sequence + 1
    end

    private
    def send_window_size(size)
      @socket.syswrite(["2", "W", size].pack("AAN"))
    end

    private
    def send_payload(payload, chunk_size, sleep_seconds)
      payload_size = payload.size
      written = 0
      while written < payload_size
        written += @socket.syswrite(payload[written..written+chunk_size])
        puts "written #{written}.."
        sleep sleep_seconds
      end
    end

    public
    def close
      @socket.close
    end
  end

  module JsonEncoder
    def self.to_frame(hash, sequence)
      json = hash.to_json
      json_length = json.bytesize
      pack = "AANNA#{json_length}"
      frame = ["2", "J", sequence, json_length, json]
      frame.pack(pack)
    end
  end

end

client_count = 1500
message = 'a'*8*16*1024
#require 'pry'
#binding.pry

puts "Connecting #{client_count} clients"
clients = client_count.times.map { Lumberjack::Client.new }
puts "Writing approximately #{(client_count*message.size)/1024.0/1024.0}Mib across #{client_count} clients"
threads = client_count.times.map do |i|
  Thread.new(i) do |i|
    client = clients[i]
    # keep message size above 16k, requiring two TLS records
    data = [ { "message" => message } ]
    50.times do
      client.write(data)
      sleep 1*rand
    end
    client.close
  end
end
threads.each(&:join)
puts "Done"
sleep 10

Run with:

# to create a single uber-jar
> ./gradlew jar 

# run the app
> java -Dio.netty.allocator.numHeapArenas=0 -XX:NativeMemoryTracking=summary -Dio.netty.allocator.numDirectArenas=1 -XX:MaxDirectMemorySize=128m -XX:-MaxFDLimit -Dlog4j.configurationFile=/path_to/log4j2.properties -jar build/libs/logstash-input-beats-6.6.0.jar