Open andsel opened 1 year ago
something I'm observing is that maybe linger is not being set in the client connections, as I continue to see the same connection ids reporting oom errors well after the script that does load generation is gone:
https://gist.github.com/jsvd/a047320ff68bbe064e93dec0d6a251f7
@jsvd I used the PR #477 to test in isolation the BeatsParser
, what I've observed (with netstat -anv -p tcp | grep <pid> | grep CLOSE_WAIT
) is tha:
CLOSE_WAIT
stateI think that when we reach an OOM error and we have the client that terminates immediately, we expect that all the 1500 channels also terminates immediately on Netty side, but in reality it takes minutes and this gives the idea of a looping error on the connections. Why it takes 5 minutes to stop logging channel's exceptions is not clear to me, maybe it's due to memory shortage.
Do you think that on server side, on first OOM error notification, should we close immediately all the other connections? In such case I think that it's an asynch request to the event loops responsible for the other connections, and it could also take such time, because at the end is what Netty is already doing.
@jsvd disabling of autoread is present in PR #485. When we will merge that PR, I'll remove the corresponding code from this, and refocus this to be second PR, to tackle the OOM problem.
I tested PR475, manually merged it to 6.8.0 and started with logstash-7.17.17, then when calling V2Batch.realease I got a reference counting issue.
[2024-02-06T17:23:15,862][INFO ][org.logstash.beats.OOMConnectionCloser][main][c84edf99f631ad281446a904b8d1587b2f1505e2a620655c3115851f704f29e5] Direct
memory status, used: 27829207040, pinned: 23017497600, ratio: 83
[2024-02-06T17:23:15,862][WARN ][io.netty.channel.DefaultChannelPipeline][main][c84edf99f631ad281446a904b8d1587b2f1505e2a620655c3115851f704f29e5] An exc
eptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the excepti
on.
io.netty.handler.codec.DecoderException: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:499) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.100.Final.jar:
4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61) ~[netty-transport-4.1.100.Final.jar:4.1.100.
Final]
at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:425) ~[netty-transport-4.1.100.Final.jar:4.1.100.Fina
l]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.100.Final.jar:4.1.100.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101) ~[netty-buffer-4.1.100.Final.jar:4.1.100.Final]
at org.logstash.beats.V2Batch.release(V2Batch.java:105) ~[logstash-input-beats-6.8.0.jar:?]
at org.logstash.beats.BeatsParser.decode(BeatsParser.java:211) ~[logstash-input-beats-6.8.0.jar:?]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final]
... 10 more
Release notes
Direct memory used by plugin weren't tracked and this could generate out-of-memory crashes when unattended spikes or traffic loads reached the plugin. The plugin is changed to monitor how much direct memory it's using and in case it's close to the limit dropping connection to free some space. To control spikes of memory usage, the incoming message reads switched from push mode to pull, so that the plugin has control on rate of ingestion and it's not the determined by the clients.
What does this PR do?
Changes the way the plugin pulls data and handle incoming connections to respect the max direct memory size and avoid out of memory errors.
This work is based on the existing #410.
Why is it important/What is the impact to the user?
Provides a set of countermeasures to limit the probability of OutOfMemory errors when creating new buffers in direct memory. In addition to this, it introduces a minimal amount fo direct memory (256MB) that's required to start processing data, if not respected, the pipeline used fails to start.
Checklist
Author's Checklist
How to test this PR locally
The test plan has some steps:
create TLS certificates
make
build the plugin and configure Logstash
./gradlew clean vendor
Gemfile
adding:bin/logstash-plugin install --no-verify
set up memory size limits, configure pipeline
config/jvm.options
add:output{ sink {} }
ulimit -S -n 1048576
Expected outcome
The expectation is that direct memory consumption never goes up to the limit and if the client doesn't consume the ACKs messages (
-a no
) is the client that goes in error and not the Logstash side.Test with real Filebeat
To start a bunch of Filebeat clients sending data to Beats input, just use the script
ingest_all_hands/filebeat/multiple_filebeats.rb
present in PRhttps://github.com/elastic/logstash/pull/15151It download a filebeat distribution, unpacks, generate some input file, prepare the
data
andlogs
folder for each Filebeat instance and run the processes.Related issues
Use cases
Screenshots
Logs