Closed Adam-Rollo closed 4 years ago
这里原先的代码逻辑: 调用kafka的list offset方法,并且选择第一个节点有的topic,并且把后续节点的topic的其它分区merge进来。 这就有一个问题,第一个节点可能并不包含所有的topic,那么就会有一些topic的分区位移没有获取到。
当获取不到消费者位移的时候(比如刚刚新建topic,或者broker丢失了消费者位移),kafka的fetch offset接口会默认返回-1。这时拿list offset的结果中取出初始位移,当作默认的位移。可是原先的代码返回的初始位移中有些topic是没有的,所以消费者位移最终变成了-1。当用-1去fetch msg的时候,就会报高水位-1的错误,引起消费者组无限rebalance。
这里原先的代码逻辑: 调用kafka的list offset方法,并且选择第一个节点有的topic,并且把后续节点的topic的其它分区merge进来。 这就有一个问题,第一个节点可能并不包含所有的topic,那么就会有一些topic的分区位移没有获取到。
当获取不到消费者位移的时候(比如刚刚新建topic,或者broker丢失了消费者位移),kafka的fetch offset接口会默认返回-1。这时拿list offset的结果中取出初始位移,当作默认的位移。可是原先的代码返回的初始位移中有些topic是没有的,所以消费者位移最终变成了-1。当用-1去fetch msg的时候,就会报高水位-1的错误,引起消费者组无限rebalance。