apache / rocketmq-streams

Apache rocketmq
https://rocketmq.apache.org/
Apache License 2.0
171 stars 82 forks source link

[ISSUE #281] Do not process the data from the removed MessageQueues by removing the queues from the originListener first #295

Closed Shuozeli closed 1 year ago

Shuozeli commented 1 year ago

Closes #281

Shuozeli commented 1 year ago

因为 removeState 中的所有操作都支持删除一个不存在的 key,所以当 loadState 时不去加载应该被删除的 queue,不会影响 removeState。如果不加载应该被删除的 queue,也不会存在介于 loadStateremoveState 之间的 inflight data

如果我的理解有错误,请指正,谢谢

ni-ze commented 1 year ago

@Shuozeli我又重新看了下,产生这个问题的根源貌似是这样: image

this.recoverHandler.apply(addQueue, removeQueue) 将状态从本地移除,但是这时候还没有触发rebalance,会继续从将要被移除的queue里面拉数据。这时候就有问题。

看看是不是可以通过调整removeTask位置解决问题。

Shuozeli commented 1 year ago

@ni-ze 抱歉,我之前的理解有些错误。我改了一下code:

第一步:将需要被删除的 removeQueueoriginalListener 删掉。做法是调用 originListener.messageQueueChanged,但传入的是 unchangedQueue 的set,这样 originalListener 会停止接收 removedQueue 的消息。 第二步:对状态进行操作。 第三步:把 addQueue 加入到 originalListener 里边。

ni-ze commented 1 year ago

LGTM~

Shuozeli commented 1 year ago

能请你review下代码吗,看看这个代码还有没有别的问题,谢谢

ni-ze commented 1 year ago

没啥问题。