Open HZL3151904214 opened 1 year ago
consumer.go文件 lockAll函数更新UpdateLastLockTime存在bug?
问题:Clustering消费模式下顺序消费mq消息,概率性报错the message queue lock expired, so consume later
根因分析:lockAll函数没有正常续约LastLockTime,导致consumeMessageOrderly函数执行到isLockExpired超时报错。
解决方法:参考defaultConsumer.lock函数实现,将UpdateLastConsumeTime和UpdateLastLockTime一起进行更新,
for idx := range lockedMQ { _mq := lockedMQ[idx] v, exist := dc.processQueueTable.Load(_mq) if exist { pq := v.(*processQueue) pq.WithLock(true) pq.UpdateLastConsumeTime() pq.UpdateLastLockTime() } set[_mq] = true }
the message queue lock expired, so consume later 错误可能跟rebalance.lockMaxLiveTime默认值30s被我改成3s有关,但是理论上lockAll函数的实现也有问题吧,为啥UpdateLastConsumeTime和UpdateLastLockTime不一起进行更新?
consumer.go文件 lockAll函数更新UpdateLastLockTime存在bug?
问题:Clustering消费模式下顺序消费mq消息,概率性报错the message queue lock expired, so consume later
根因分析:lockAll函数没有正常续约LastLockTime,导致consumeMessageOrderly函数执行到isLockExpired超时报错。
解决方法:参考defaultConsumer.lock函数实现,将UpdateLastConsumeTime和UpdateLastLockTime一起进行更新,
for idx := range lockedMQ { _mq := lockedMQ[idx] v, exist := dc.processQueueTable.Load(_mq) if exist { pq := v.(*processQueue) pq.WithLock(true) pq.UpdateLastConsumeTime() pq.UpdateLastLockTime() } set[_mq] = true }