Closed sungam3r closed 2 months ago
Hi Ivan, thanks for the note!
If your app's logging 1 MB messages then you might be denial-of-servicing yourself π (....it's a fair point of course, but any system that accepts unbounded amounts of untrusted input will be vulnerable in this way).
Using PeriodicBatchingSink
, a batch containing a poisoned message will be dropped after 8 retries, and if it's followed by two more immediate failures, the whole queue will be dropped:
This mitigation kicks in at around 10 minutes with the default 2-second flush interval. Tightening this up to say 100 ms interval will bring the buffering time down proportionally (to 30 seconds in this case).
Alternatively, you can set QueueSizeLimit
on PeriodicBatchingSinkOptions
down from the 100,000 default to e.g. 1000, and the total memory consumption for the buffer will be reduced.
For Kafka, as you concluded, batching isn't necessarily the best strategy, so it sounds like you found a better solution anyway π
Cheers, Nick
I understand all this. Of course I can manipulate the settings, but this does not change the essence of what is happening. Poisoned message (whatever the reason, size or something else) in one batch leads to dropping all queued messages. It is possible that it is not even worth changing anything in the current algorithm. But I highly recommend that you reflect this behavior prominently in the documentation/readme.
We faced this problem with Kafka sink. If the message is too long (1MB by default), then the kafka returns an error. This is a fatal client error but Kafka server itself is fully functional. As we dig deeper into the code, we found this to be a general problem with
PeriodicBatchingSink
- the problem of so called poisoned message. The consequence is a non-deterministic increase in memory consumption right up to OOM due to message buffering for a long time due to exponential growth of timeouts inBatchedConnectionStatus.NextInterval
. It also obviously results in the loss of all buffered messages. I understand thatPeriodicBatchingSink
is written with an arbitrary message destination in mind. This is a general algorithm, OK. I just want to point out its vulnerability to denial of service. We've had a lot of experience getting OOM in production because of this π. These problems are certainly solvable. Now we just gave up usingPeriodicBatchingSink
and implemented the code for writing messages to Kafka ourselves, processing poisoned messages in a special way (trimming them and just resending).