dehora / nakadi-java

🌀 Client library for the Nakadi Event Broker (examples: http://bit.ly/njc-examples, site: https://dehora.github.io/nakadi-java/)
MIT License
30 stars 19 forks source link

Lowers default batch buffer and allows setting by configuration. #287

Closed dehora closed 7 years ago

dehora commented 7 years ago

The stream processor can absorb event batches sent by Nakadi into a buffer managed by the I/O layer of the processor. Observers that can't keep up with a stream will result in the batches being buffered to a high number. This can cause memory pressure due to the buffer not being drained quickly enough. Also the byte size of a batch can vary greatly based on the underlying size of each event and the number of events requested per batch (a multi-megabyte size batch is not unheard of).

This lowers the default batch buffer count to 128, and allows the batch buffer count to be set via stream configuration. It's expected this will help consumers whose observers are relatively slow compared to the rate of events being sent and read from the network. Observers that exceed the buffer size will drive an exception which causes a disconnect from the server and the stream processor entering a retry cycle. The downside is that slow consumers will tend to send more repeated events - each disconnect causes Nakadi to reset the session, in turn that means in flight batches/events cannot be committed via the checkpointer. They will have to be reprocessed after a reconnection, as Nakadi will resend the events after the client reconnects. The upside is that the consumer's local memory pressure can be managed better.

For #256.

dehora commented 7 years ago

This shows a run where the batch buffer is high (as per before this patch) with a consumer that has an 8000ms delay per batch, simulating a slow observer:

with1

The end result is a JVM that is struggling to keep up after a few minutes, where underlying strings representing event batches are not being released after multiple generations:

screen shot 2017-10-05 at 21 32 42 screen shot 2017-10-05 at 21 32 14

This shows a run with the same slow observer processing the same stream, but a (default) batch buffer of 128 as per the patch:

without1

The end result is a JVM that can keep up after 30 minutes entering a classic sawtooth, where underlying strings representing event batches are being released after just few generations:

screen shot 2017-10-05 at 21 11 02 screen shot 2017-10-05 at 21 11 08
satybald commented 7 years ago

@dehora thanks for the patch that allows reducing JVM heap pressure. I have the question how this back pressure implementation is different from already existing one in StreamObserver.requestBackPressure and StreamObserver.requestBuffer?