If my emitter is down for some reason, I see the size of my buffer growing larger and larger until my VM runs out of memory. The reason is the record processor continues to read records from Kinesis, regardless of the size of my buffer. This separate thread is reading the records from Kinesis and draining to my buffer continuously, whether I can emit the records at that moment or not.
Is there some way to provide back-pressure information to the thread that is polling Kinesis for additional records in these cases? Wouldn't any process that emits records much slower than they are read from Kinesis end up in an OoM situation?
We use batch semantics in our emitter to write to Elasticsearch which can handle about a 10MB payload, so we restrict our in-memory buffer to 10MB. However, if Elasticsearch is down for some reason for a period of a minute or so, our buffer grows to 100MB in size. Doesn't anyone who uses the KCL have to consider segmenting the records passed to their emitter because they could be passed, many, many more records than they expect given their buffer configuration parameters?
If my emitter is down for some reason, I see the size of my buffer growing larger and larger until my VM runs out of memory. The reason is the record processor continues to read records from Kinesis, regardless of the size of my buffer. This separate thread is reading the records from Kinesis and draining to my buffer continuously, whether I can emit the records at that moment or not.
Is there some way to provide back-pressure information to the thread that is polling Kinesis for additional records in these cases? Wouldn't any process that emits records much slower than they are read from Kinesis end up in an OoM situation?
We use batch semantics in our emitter to write to Elasticsearch which can handle about a 10MB payload, so we restrict our in-memory buffer to 10MB. However, if Elasticsearch is down for some reason for a period of a minute or so, our buffer grows to 100MB in size. Doesn't anyone who uses the KCL have to consider segmenting the records passed to their emitter because they could be passed, many, many more records than they expect given their buffer configuration parameters?