When poll returns an empty result, fetch the current partition offset from Kafka instead of falling back to our own internal "latest" offset that we read from the topic at some point in the past.
This works around the case where, if we have a topic that used to have some records, but they were all expired, the consumer would keep looping infinitely.
It happens this way since the start and end offset of our empty-but-once-populated partition will be the same, 10 for example, but our internal latest offset is perhaps 5; and if no new records appear while we poll, we'll keep the '5', comparing it to the end offset of 10, forever assuming we need to read some more records.
When poll returns an empty result, fetch the current partition offset from Kafka instead of falling back to our own internal "latest" offset that we read from the topic at some point in the past.
This works around the case where, if we have a topic that used to have some records, but they were all expired, the consumer would keep looping infinitely.
It happens this way since the start and end offset of our empty-but-once-populated partition will be the same, 10 for example, but our internal latest offset is perhaps 5; and if no new records appear while we poll, we'll keep the '5', comparing it to the end offset of 10, forever assuming we need to read some more records.