apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.19k stars 11.67k forks source link

After setMaxReconsumeTimes is set, the order consumer will retry indefinitely #6163

Closed originalHeaed closed 1 year ago

originalHeaed commented 1 year ago

你好! 版本:RocketMQ 5.0.0 情况概述:消费端只有一个消费者,设置 MaxReconsumeTimes 值为 1,调用DefaultMQPushConsumer 的顺序消费端口,在消费重试次数达到 1 后,消息被消费端投入延迟队列,随后 broker 重新将消息投入了重试队列,并没有投入死信队列; 个人源码走查: 消息在重试超过指定次数后,由消费端投递至延迟队列时,消息的重试次数 == 最大重新重试次数; 消息在延迟时间到达后,重新投递时调用 SendMessageProcessor.sendMessage 方法,内部调用了 SendMessageProcessor.handleRetryAndDLQ 方法进行判断消息是否投递死信队列(且没有将消息的重试次数进行增加)。 下面为 SendMessageProcessor.handleRetryAndDLQ 源码:

private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
        RemotingCommand request,
        MessageExt msg, TopicConfig topicConfig, Map<String, String> properties) {

        String newTopic = requestHeader.getTopic();
        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
            if (null == subscriptionGroupConfig) {
                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                response.setRemark(
                    "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                return false;
            }
            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal() && requestHeader.getMaxReconsumeTimes() != null) {
                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
            }
            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
            // Using ">" instead of ">=" to compatible with the case that reconsumeTimes here are increased by client.
            if (reconsumeTimes > maxReconsumeTimes) {
                properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1");
                newTopic = MixAll.getDLQTopic(groupName);
                int queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE | PermName.PERM_READ, 0
                );
                msg.setTopic(newTopic);
                msg.setQueueId(queueIdInt);
                msg.setDelayTimeLevel(0);
                if (null == topicConfig) {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("topic[" + newTopic + "] not exist");
                    return false;
                }
            }
        }
        int sysFlag = requestHeader.getSysFlag();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
        }
        msg.setSysFlag(sysFlag);
        return true;
    }

上述方法中进入死信队列的条件是 reconsumeTimes > maxReconsumeTimes,由延迟队列投递的顺序消息没满足条件,最终导致消息投入了重试队列,随后消费端拉取该消息,重复上面的流程(消息的 reconsumeTimes 并没有增加)。 我想问下这里是需要消费端在消费时自行增加 reconsumeTimes 吗?且只有消息从重试队列中那消息后才需要自行增加,还是bug 呢

ShadowySpirits commented 1 year ago

Please ask questions in Github Discussion, see Q&A Guidelines.

RongtongJin commented 1 year ago

Hi @originalHeaed, it is indeed a bug and will be fixed ASAP.