Closed abhishekmendhekar closed 5 years ago
After an offline discussion with @radai-rosenblatt , here is the conclusion. cc @smccauliff
API Behavior
poll()
interface will only return any record processing exceptionseek*()
all the cached exceptions will be droppedP
is pause()
and P
had exception on last poll()
then the next poll()
will throw exception only for all unpaused partitions (excludes P
). It'll also seek past the offset for all unpaused partitions that hit the exception (excludes P
). Also, discard the cached exceptions.Additional optimizations
poll()
hit an exception when processing records and no other records have been fetched then poll()
will throw an exception immediately instead of doing it in following poll()
call.
poll() keeps a cache of exceptions per partition and throws exception in subsequent poll() call. poll() also seek offset past exception assuming next poll() will throw exception. This can lead to message loss if the user does not call poll() and commits offsets.
The fix stores the current offset and resume offset along with the exception. On exception, it seeks to current offset (before exception) along with resume offset (current + 1). On next poll(), poll() will seek to resume offset and throw exception and the user can decide on either ignoring it which will skip the offset or exit. On next seek*(), will throw an IllegalStateException and seeking to resume offsets, leaving the decision to user to continue poll() or crash. On exit, the user can safely commit offsets it consumed (excluding the offset that hit an exception) without losing messages.