apache / rocketmq-client-go

Apache RocketMQ go client
https://rocketmq.apache.org/
Apache License 2.0
1.31k stars 424 forks source link

Push consumer has a logic error in func resetRetryAndNamespace #970

Closed humkum closed 1 year ago

humkum commented 1 year ago

BUG REPORT

  1. Please describe the issue you observed:
    • What did you do (The steps to reproduce)? When consume failed, set return type as ConsumeRetryLater, the message would just re-consumed only one time. I found that when consume retry message, there is a logic error in reset message's topic as follows:
func (pc *pushConsumer) resetRetryAndNamespace(subMsgs []*primitive.MessageExt) {
    groupTopic := internal.RetryGroupTopicPrefix + pc.consumerGroup
    beginTime := time.Now()
    for idx := range subMsgs {
        msg := subMsgs[idx]
        retryTopic := msg.GetProperty(primitive.PropertyRetryTopic)
        if retryTopic == "" && groupTopic == msg.Topic {
            msg.Topic = retryTopic
        }
        subMsgs[idx].WithProperty(primitive.PropertyConsumeStartTime, strconv.FormatInt(
            beginTime.UnixNano()/int64(time.Millisecond), 10))
    }
}

which should be retryTopic != "" && groupTopic == msg.Topic