apache / rocketmq-client-go

Apache RocketMQ go client
https://rocketmq.apache.org/
Apache License 2.0
1.29k stars 415 forks source link

Close the channel before sending data if the consumer has been shutdown. #1051

Open cserwen opened 1 year ago

cserwen commented 1 year ago

The Question

For the consumer, the closing and writing of msgCh are in two coroutines. Even though the chan of closeChan is added, since case clauses will be randomly executed for the select statement, it is still possible Write data to a closed channel

if !pq.order {
    select {
    case <-pq.closeChan:
        return
    case pq.msgCh <- messages:
    }
}

https://github.com/apache/rocketmq-client-go/blob/master/consumer/process_queue.go#LL124C1-L130C3

How fo fix?

We'd better close the channel in the goroutine that pullMessage to avoid closing and writing at the same time.

0daypwn commented 11 months ago

https://github.com/apache/rocketmq-client-go/blob/da20ee7b0743a08ecf1fc53403f530b74eea2257/consumer/consumer.go#L296-L306 谁写入channel,谁负责关闭 是个好习惯

在已经有 close(pq.closeChan) 之后,再 close(pq.msgCh) 这个操作完全是多余的,关闭还会导致上述写入的风险。 在 close(pq.closeChan) 之后,只要不 close(pq.msgCh), pq.putMessage 和 pq.getMessage 即使select到 msgCh 也不会有影响。 pq.putMessage 和 pq.getMessage 是msgCh唯二的使用者,之后是一定可以return的,也不存在会阻塞/泄露之类的问题。 因此我认为只去掉 close(pq.msgCh) 这部分即可 @cserwen

另外 consumer/consumer.go 这部分是基类 修改需要考虑 consumer/push_consumer.go consuemr/pull_consumer.go