logstash-plugins / logstash-input-beats

Apache License 2.0
87 stars 81 forks source link

Graceful shutdown improvements and `undefined method 'accept'` fix. #500

Closed mashhurs closed 3 months ago

mashhurs commented 3 months ago

Description

Improvements:

  1. Fixes the undefined method 'accept' error. We need to handle such plugin crash regardless of message handling process sequence as we are running multithreads. Full error log:

    {"level":"INFO","loggerName":"org.logstash.beats.BeatsHandler","timeMillis":1721298048946,"thread":"defaultEventExecutorGroup-78-6","logEvent":{"message":"[local: 0.0.0.0:3511, remote: undefined] Handling exception: org.jruby.exceptions.NoMethodError: (NoMethodError) undefined method `accept' for nil:NilClass (caused by: org.jruby.exceptions.NoMethodError: (NoMethodError) undefined method `accept' for nil:NilClass)"}}
    {"level":"WARN","loggerName":"io.netty.channel.AbstractChannelHandlerContext","timeMillis":1721298048946,"thread":"defaultEventExecutorGroup-78-6","logEvent":{"message":"Failed to submit an exceptionCaught() event."}}
    {"level":"WARN","loggerName":"io.netty.channel.AbstractChannelHandlerContext","timeMillis":1721298048946,"thread":"defaultEventExecutorGroup-78-6","logEvent":{"message":"The exceptionCaught() event that was failed to submit was:"}}
  2. Graceful shutdown

    • Separated boss group and shut it down separately which helps incoming socket communications close at the first stage make sure we no longer accept incoming communications and initialize channels.
    • Declare quiet period with WorkGroup#shutdownGracefully() (but don't wait for termination now) not to get any other execution tasks from the queue.
    • Terminate all execution threads if they have pending tasks and wait for them. This provides BeatsHandler to receive any batches and exceptions channel face.
    • If WorkGroup is still active, we wait for its termination (we get internal RejectedExecutionException if still active, which is very fine and by it is design)

How to test?

mashhurs commented 3 months ago

As I can understand the idea of this PR is to use bossGroup and workerGroup, so that the boss group can be shutdown before the worker one.

It's not completely clear to me the root of the problem when we use the same event loop group to do both IO on the acceptor socket and IO on the client socket.

Although shutdown requested, incoming requests keep coming and netty places requests into its task queue. netty terminates its worker group and boss group threads but no guarantee that boss group terminates first unless we intentionally force to terminate it first. So, BeatHandler#exceptionCaught receives RejectedExecutionException because no executor thread left to continue the task. BeatHandler#channelRead0 keeps receiving incoming messages and causes

// example logs
{"level":"WARN","loggerName":"io.netty.channel.AbstractChannelHandlerContext","timeMillis":1721298048741,"thread":"defaultEventExecutorGroup-78-1","logEvent":{"message":"Failed to submit an exceptionCaught() event."}}
{"level":"WARN","loggerName":"io.netty.channel.AbstractChannelHandlerContext","timeMillis":1721298048741,"thread":"defaultEventExecutorGroup-78-1","logEvent":{"message":"The exceptionCaught() event that was failed to submit was:"}}

The null reference on the codec, happens because the connections_list doesn't have anymore the ConnectionState for a context (so essentially the Channel). That's originated from the call to onConnectionClose of message listener done by BeatsHandler.channelInactive(). Given that the IO for a channel is managed always by the same EventLoop thread the null reference could be imagined in 2 situations:

  • income flow (client to server direction): the closing of a channel was processed before processing other data frames from the same channel. As per the constraint specified above (always same thread process data for same channel, so it's sort of serialisation) this case shouldn't be realised.
  • outcome flow: the server is writing an ACK to the client which is now disconnected. In this case would be enough that the BeatsHandler checks the channel is active before writing.
andsel commented 3 months ago

In https://github.com/logstash-plugins/logstash-input-beats/blob/5ff649a7561f6e25a67b2c373ca265cd1a81a05d/src/main/java/org/logstash/beats/Server.java#L61 we use the group method as defined in Javadoc https://netty.io/4.1/api/io/netty/bootstrap/ServerBootstrap.html#group-io.netty.channel.EventLoopGroup- it uses the same event loop group both for acceptor and client sockets, so in this case there isn't any distinction between boss group and worker group because are the same,

netty terminates its worker group and boss group threads but no guarantee that boss group terminates first unless we intentionally force to terminate it first

how it's possible that BeatHandler.channelRead0 continue receiving tasks if the only event loop group is in shutdown?