apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.26k stars 11.7k forks source link

[Bug] Bug title 当消费者获取消息不存在的属性,服务端处理过程中会有异常,并把消息的消费状态设置为RECONSUME_LATER,无法进入消费逻辑,客户端也感知不到异常 #8702

Open 13735485496 opened 2 months ago

13735485496 commented 2 months ago

Before Creating the Bug Report

Runtime platform environment

CentOS Linux release 7.9.2009 (Core)

RocketMQ version

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>

JDK Version

java version "1.8.0_381" Java(TM) SE Runtime Environment (build 1.8.0_381-b09) Java HotSpot(TM) Client VM (build 25.381-b09, mixed mode, sharing)

Describe the Bug

第一个问题:当消费者获取消息的ORIGIN_MESSAGE_ID属性,这个属性在第一次就消费成功的时候,是不存在的,服务端处理的时候,捕获到了异常,就暴力的把消费的status设置为null,进而转入RECONSUME_LATER,这个会导致首次失败

第二个问题:当消费者获取消息不存在的属性,会有空指针异常,导致status设置为null,进而转入RECONSUME_LATER,这个会一直失败,直到进入死信队列

Steps to Reproduce

consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs){ try {

                    //格式化输出时间
                    LocalDateTime currentTime = LocalDateTime.now();
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                    String currentTimeString = currentTime.format(formatter);

                    //接收到消息就打印
                    System.out.println("消费者接收到消息了!!!"
                            +", 当前时间:"
                                    + currentTimeString
                            +"消费者线程:"
                                    + Thread.currentThread().getName()
                            +", 消息的topic:"
                                    + new String(msg.getTopic())
                            +", 队列id:"
                                    + msg.getQueueId()
                            +", body:"
                                    + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET)
                            +", 消息id:"
                                   + new String(msg.getMsgId())
                            +", 当前延迟级别:"
                                    +new String(String.valueOf(msg.getDelayTimeLevel()))
                            +", 消息的重试次数:"
                                    +new String(String.valueOf(msg.getReconsumeTimes()))
                            //第一个问题:当消费者获取消息的ORIGIN_MESSAGE_ID属性,这个属性在第一次就消费成功的时候,是不存在的,服务端处理的时候,捕获到了异常,就暴力的把消费的status设置为null,进而转入RECONSUME_LATER,这个会导致首次失败,但客户端感知不到异常
                            //+", 消息的原始messageid:"
                            //       +new String(msg.getProperty("ORIGIN_MESSAGE_ID"))
                            //第二个问题:当消费者获取消息不存在的属性,会有空指针异常,导致status设置为null,进而转入RECONSUME_LATER,这个会一直失败,直到进入死信队列,但是客户端无法感知异常
                            //+", 尝试查询某个不存在的属性:"
                            //        +new String(msg.getProperty("111"))
                            +", 消息的各项属性:"
                                    +new String(String.valueOf(msg.getProperties()))
                            +", 消息的CommitLogOffset:"
                                    +new String(String.valueOf(msg.getCommitLogOffset()))
                            +", 消息的QueueOffset:"
                                    +new String(String.valueOf(msg.getQueueOffset()))
                            );

                    //如果是第1条消息,才消费成功,第0条消息消费失败,第2条消息被sql过滤掉
                    if(new String(msg.getBody(), "UTF-8").equals("这是第1条消息")){
                        System.out.println("消息id:" +new String(msg.getMsgId())+"消费成功!");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    else{
                        if(new String(String.valueOf(msg.getReconsumeTimes())).equals("2")){
                            System.out.println("消息id:" +new String(msg.getMsgId())+"消费失败次数达到2次了");
                        }

                        System.out.println("消息id:" +new String(msg.getMsgId())+"消费失败!");
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }

                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }

            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

    });

What Did You Expect to See?

将异常返还给客户端

What Did You See Instead?

客户端没有收到任何异常结果

Additional Context

debug过程

1 2 3 4 5 6