apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.31k stars 11.72k forks source link

[Bug] 4.5.2版本顺序消息消费,启动时存在几十秒延迟 #7242

Closed mobaijaavaer closed 3 months ago

mobaijaavaer commented 1 year ago

Before Creating the Bug Report

Runtime platform environment

Macos Montery 12.1(m1)

RocketMQ version

4.5.2

JDK Version

Oracle JDK 1.8

Describe the Bug

您好,我在使用Rocketmq 4.5.2版本客户端进行顺序消息消费场景测试时发现一个诡异的问题: 顺序消息客户端刚启动时有极大概率存在几十秒的延迟时间,在这时间内无法拉取消息进行消费,以下是我的测试的代码:

public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String group = ""; RPCHook aclRPCHook = new AclClientRPCHook(new SessionCredentials("ak","sk")); DefaultMQPushConsumer mqClient = new DefaultMQPushConsumer(group, aclRPCHook, new AllocateMessageQueueAveragely(), true, null); mqClient.setAccessChannel(AccessChannel.CLOUD); mqClient.setInstanceName("instanceId"); mqClient.setNamesrvAddr("namesrc"); mqClient.subscribe("topic", ""); mqClient.setConsumeTimeout(10000); mqClient.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { MessageExt messageExt = msgs.get(0); String time = new String(messageExt.getBody(), StandardCharsets.UTF_8); long now = System.currentTimeMillis(); long duration = now - (Long.parseLong(time)); System.out.println("消息id:"+messageExt.getMsgId()+",发送时间:"+simpleDateFormat.format(new Date(Long.parseLong(time)))+ ",存储时间:"+simpleDateFormat.format(new Date(messageExt.getStoreTimestamp()))+",客户端消费时间:"+simpleDateFormat.format(new Date(now))+",延时:"+duration); return ConsumeOrderlyStatus.SUCCESS; } }); mqClient.start(); //生产者 DefaultMQProducer mqProducer = new DefaultMQProducer("gaoding-message-producer", new AclClientRPCHook(new SessionCredentials("LTAI4G4tf1j4TwH7c9BvkfaU","tN6B4fsNnEjZPCbINvnzL5T4RAna2T")), true, null); mqProducer.setAccessChannel(AccessChannel.CLOUD); mqProducer.setInstanceName("instanceId"); mqProducer.setNamesrvAddr("namesrc"); mqProducer.start(); //连续发送3条顺序消息 for (int i = 0; i < 3; i++) { Message test = new Message("MQ_INST_125036_BXbcy8xH%pt-gaoding-message-test-order-dev", "", "test", (System.currentTimeMillis() + "").getBytes(StandardCharsets.UTF_8)); test.putUserProperty("__SHARDINGKEY", "test"); mqProducer.send(test, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { int select = Math.abs(arg.hashCode()); if (select < 0) { select = 0; } return mqs.get(select % mqs.size()); } }, "test"); } Thread.sleep(10000L); }

这是控制台截图: image

可以发现broker端存储消息时间和发送时间几乎一致,而实际客户端消费时间存在较大延迟,所以排除掉是 生产者发送延迟的问题; 其次,通过断点调试发现,顺序消息消费会对当前客户端分配到的所有队列尝试向broker申请加锁,只有加锁成功的队列才会允许向broker发起消息拉取动作,目前上锁是由 ConsumeMessageOrderlyService#start 方法中一个定时线程去调用lockMQPeriodically方法,默认20s执行一次,通过在对应上锁代码中打印消息发现,原因系 第一次启动时本地没有任何队列,所以请求直接返回了,需要等客户端rebalance成功后才有队列去申请上锁: image image

Steps to Reproduce

如上所述

What Did You Expect to See?

消费消息应该尽可能快,低延时

What Did You See Instead?

客户端延迟了几十秒才开始拉取消息

Additional Context

No response

drpmma commented 1 year ago

已在新版本客户端中修复,详见https://github.com/apache/rocketmq/issues/5465

rocketmq-client >= 5.0.0 或 >= 4.9.5

github-actions[bot] commented 3 months ago

This issue is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs.

github-actions[bot] commented 3 months ago

This issue was closed because it has been inactive for 3 days since being marked as stale.