IBMStreams / streamsx.kafka

Repository for integration with Apache Kafka
https://ibmstreams.github.io/streamsx.kafka/
Apache License 2.0
13 stars 9 forks source link

KafkaConsumer can create invalid checkpoint when group management is active #148

Closed ghost closed 5 years ago

ghost commented 5 years ago

When the PE of one Kafka Consumer in a group is killed, the PE is relaunched, its operators are reset, and the restarted Kafka Consumer re-joins the group when it starts fetching messages.

When partition rebalancing (among the other operators) happens while the killed PE is restarting, one operator initiates the CR reset - if region is not RESETTING.

When the killed operator is back, it is reset - this may by the same reset as triggered by a different operator - and starts fetching messages. At this time, it re-joins the group triggering another partition rebalance. The restarted operator sees this rebalance as a partition assignment after subscription, the other operators see this onPartitionsRevoked - onPartitionsAssigned cycle as spontaneous rebalance, and should trigger region reset. Unfortunately none of these consumers trigger the CR reset because the region is still RESETTING - but all these operators expect a reset according their state handling.

As consequence, the consumer is not seeked, and fetch offsets are not written into the offset manager in onPartitionsAssigned. An immediate DRAIN-CHECKPOINT, does not include fetch offsets for the assigned partitions to the checkpoint.

When the operator is reset to such a checkpoint, offsets for assigned partitions are missing. Those partitions are seeked to what startPosition is. If it is Default, the consumer may be seeked to the right offset, if it is Beginning, too many messages are re-played. With startPosition End messages are skipped.

Observed in toolkit v1.8.0

ghost commented 5 years ago

The solution is to reset the consistent region without any conditions like state of the region. Also, the previous solution, which is implemented in versions 1.5.0 ... 1.7.x would not have avoided this error if it was still implemented in v1.8.0.

The side effect of the solution is, that multiple reset attempts can occur if multiple operators in the group trigger a consistent region reset independently.

ghost commented 5 years ago

resolved in v1.9.0