If a runtime exception happens in our emitter code (preventing buffer.clear()) it bubbles out and is swallowed in the KCL ProcessTask.java. The KCL continues feeding new records to the buffer. This allows the buffer to grow unbounded.
Then if the application recovers (before OutOfMemory), the large buffer is flushed all at once.
Would be great if there was ability to:
Pause processing/respond to backpressure if we detect some issue, and resume once it goes away
Have KinesisConnectorRecordProcessor.java: transformToOutput(buffer) use the buffer configuration to cap how much data is flushed. ie: only output n items, or n bytes of items at a time.
Hi,
If a runtime exception happens in our emitter code (preventing
buffer.clear()
) it bubbles out and is swallowed in the KCL ProcessTask.java. The KCL continues feeding new records to the buffer. This allows the buffer to grow unbounded.Then if the application recovers (before OutOfMemory), the large buffer is flushed all at once.
Would be great if there was ability to: