Closed tianliu94321 closed 1 year ago
一般的使用push consumer的时候, rocketmq的消费者会整合到服务中, 服务下线的时候, 消费者会随着服务一起下线。 broker也会自动感知有消费者下线了,重新负载均衡。
一般的使用push consumer的时候, rocketmq的消费者会整合到服务中, 服务下线的时候, 消费者会随着服务一起下线。 broker也会自动感知有消费者下线了,重新负载均衡。
我在(nacos 2.1.2、rocket mq 4.8.0、rocketmq-spring-boot-starter 2.2.0 )测试好像不行,请问是要做什么配置吗,是rocket mq要注册到nacos里面吗?还是说要自己实现。谢谢。
rocketmq-spring-boot-starter是整合到服务中的吧, 服务是通过nacos做服务发现的哇。 基于这个情况, 如果服务在nacos中下线有2种情况:
是整合到服务里的,我说的在nacos下线是指,在nacos的管理后台里面有个按钮可以单独的把一个服务的某个实例下线,在这个时候,走网关进来的请求是调不到这个实例的,但是MQ的消息还是可以接收到的。
我之前的问题是把服务从nacos中下线准备重启的时候消息还是会进来,这个其实是因为我的服务没有开启springboot的安全停机配置。开启安全停机配置之后,又有了一个新问题,关机的时候,消息还在消费中的时候会被直接杀掉,我在网上搜说可以同时实现RocketMQPushConsumerLifecycleListener接口,通过设置停机等待时间来避免。不知道可不可以这么写。
@Override public void prepareStart(DefaultMQPushConsumer consumer) { consumer.setAwaitTerminationMillisWhenShutdown(20000); }
这种方式你可以试试, 但是需要注意如果消息消费时间很长的时候,可能有消息在等待关闭的时间内没有处理完,可能会重复投递,这种需要做消费幂等喔,话说回来了, 幂等都是需要做的。
实现的逻辑看了下代码:
consumer.setAwaitTerminationMillisWhenShutdown(20000);
在shutdown consumer的最终生效代码如下: https://github.com/apache/rocketmq/blob/39bb9386f10d5d8dfe81183c172a3a86f6d313bd/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java#L712-L717
具体实现方式是平稳地关闭消费线程池, 代码逻辑如下: https://github.com/apache/rocketmq/blob/39bb9386f10d5d8dfe81183c172a3a86f6d313bd/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java#L147-L165
消息消费的耗时都不长,应该没问题,主要是没想到他会直接杀掉执行中的消费,只要能够配置不直接杀掉就没问题了。
刚准备说能不能在RocketMQMessageListener上定义一个awaitTerminationMillisWhenShutdown 在DefaultRocketMQListenerContainer初始化时把值赋给consumer,然后发现最新版的源码里已经有了...
使用rocketmq-spring-boot-starter中的@RocketMQMessageListener注册的消费者怎么在服务下线的时侯同步下线