maihaoche / rocketmq-spring-boot-starter

Spring Boot starter for RocketMQ
Apache License 2.0
287 stars 147 forks source link

关于消息幂等的处理 #2

Closed ijaychang closed 7 years ago

ijaychang commented 7 years ago

由于rocketmq-spring-boot-starter为我们处理了消息Body的解析工作,但是导致messageExt对象对我们而言是不可见的了,之前是设想使用messageExt.getBodyCRC()来处理消息幂等性。那现在我能想到的是用md5hash(JSON.toJSONString(message)),将该哈希值作为key存入redis,并设置有效时间比如10分钟。当第一次消费的时候就将该值存到redis;如果有重复消费的情况时,当发现redis中已经有存在该key则忽略此条消息,来防止重复消费

想问下作者,你们是如何处理消息幂等的?

suclogger commented 7 years ago

嗯,你说的问题也是我一直在考虑的。 我觉得通过messageExt中的msgId来维护幂等性是成本很高的(比如将msgId或者key存入redis),所以我目前倾向的方式是通过消息key结合消息体中的业务属性做消费的幂等,如果有好的方式也欢迎提出来。

另,我也越发觉得messageExt中的很多属性需要暴露给消费者,比如我之前是不暴露key的,加了之后抽象类的api就发生了一次变化,依赖方就很痛苦了,也不愿意升级,所以需要更好的方式,正在考虑放到一个k-v的集合当中,有好的想法也欢迎加入一起改善。

ijaychang commented 7 years ago

消息key结合消息体中的业务属性做消费的幂等我觉得也是可以的。

关于“我也越发觉得messageExt中的很多属性需要暴露给消费者”,深有同感哈,我fork的在我司内部版本,就将messageExt的tag也暴露给消费者了。这个我感觉还是很有必要的,因为当我订阅的Topic有多个tag的时候,按照目前的设计,我就不能根据tag来区分不同的业务场景。我只能再多建几个Topic。

还有一个问题我想说下,按照目前的设计,由于consumerGroup,topic都是写死在@MQConsumer注解的。这样一来就必须在每个环境都各自部署一套rocketmq服务,因为如果还是用同一套rocketmq服务,意味着各个环境都在共同使用topic,会导致环境间消费者消费错乱。所以我fork的版本,稍微改造了下,允许在消费者中定义topic,consumerGroup属性(优先使用@MQConsumer里的topic与consumerGroup)。这样我们只要配置不同的topic名称,不同的consumerGroup名称,就能隔离不同环境的消息错乱问题。哎,公司穷没办法哈。。。

使用方法如下: `@MQConsumer(tag = "A") public class DemoConsumerA extends AbstractMQPushConsumer{

@Value("${rocketmq.demoConsumerA.topic}")
private String topic;

@Value("${rocketmq.demoConsumerA.consumerGroup}")
private String consumerGroup;

@Override
public boolean process(String messageKey, String tag, Demo message) {
    System.out.println("DemoConsumerA.process "+"messageKey=【"+messageKey+"】,message=【"+message+"】");
    try {
        Thread.sleep(50);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return true;
}

}`

application.yml里定义下就好:

rocketmq: ... demoConsumerA: topic: TP_DEMO consumerGroup: CID_DEMO_A

对于生产者还好,因为发送消息的时候,可以指定topic。这是我fork的,经过修改后我们公司使用的版本 https://github.com/jaychang9/qh-rocketmq-spring-boot-starter 本来想发个pull request的,但我觉得可能这个特性并不是大家所需要的。

suclogger commented 7 years ago

@jaychang9 topic和consumerGroup是支持配置项的,具体实现在 代码 MQConsumerAutoConfiguration中:

        String consumerGroup = applicationContext.getEnvironment().getProperty(mqConsumer.consumerGroup());
        if(StringUtils.isEmpty(consumerGroup)) {
            consumerGroup = mqConsumer.consumerGroup();
        }
        String topic = applicationContext.getEnvironment().getProperty(mqConsumer.topic());
        if(StringUtils.isEmpty(topic)) {
            topic = mqConsumer.topic();
        }

会尝试先将注解中的内容解析为一个配置项。

ps.我个人不是很倾向大力推广tag的方式,因为调用方可能无意中使用了同一个consumerGroup来监听同一个topic的多个tag,因为rocketmq的特性就会导致消息丢失或者重复,你在使用的时候也可以注意一下。

还有在我们实际应用场景中,还是做了不同环境的物理隔离,即不同环境部署不同的mq实例,这样可以最大程度避免消息不知道被谁消费掉的问题。

ijaychang commented 7 years ago

非常感谢作者能及时回复,这个可以有,那比如我的Consumer类定义的注解是@MQConsumer(consumerGroup="CID_DEMO",topic="TP_DEMO") 如果我想覆盖注解中的consumerGroup topic配置的话,是不是加下启动参数,或者是配置一个环境变量?

比如 jar -jar xxx.jar -DCID_DEMO=CID_DEMO_DEV -DTP_DEMO=TP_DEMO_DEV

suclogger commented 7 years ago

@jaychang9 master分支的最新代码中,使用了extMap 来存放messageExt中的属性和message.properties中的属性,一起帮忙看看这样实现合适不。

suclogger commented 7 years ago

关于消息幂等的处理还是决定交给业务方通过extMap中的属性来自行处理

ijaychang commented 7 years ago

好的,我去看看,这几天没看github

ijaychang commented 7 years ago

看了下哈,我感觉还不如直接把MessageExt暴露出来给使用者。。。用的时候还得从map里根据key取出来,而且还得强转类型。因为存的时候值是Object类型。