apache / rocketmq-spring

Apache RocketMQ Spring Integration
https://rocketmq.apache.org/
Apache License 2.0
2.09k stars 894 forks source link

使用starter的时候需要自定义全局拦截器在 listener onMessage 之前处理 MessageExt #583

Open texousliu opened 1 year ago

texousliu commented 1 year ago

FEATURE REQUEST

  1. 使用starter的时候需要自定义全局拦截器在 listener onMessage 之前处理 MessageExt
  2. Provide any additional detail on your proposed use case for this feature.
  3. 添加message处理器
    
    package org.apache.rocketmq.spring.support;

import org.apache.rocketmq.common.message.MessageExt;

/**

}

4. ListenerContainerConfiguration 注入自定义处理器
```java
    private List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers;

    public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
                                          ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties,
                                          @Autowired(required = false) List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers) {
        this.rocketMQMessageConverter = rocketMQMessageConverter;
        this.environment = environment;
        this.rocketMQProperties = rocketMQProperties;
        this.rocketMQListenerMessageCustomizers = rocketMQListenerMessageCustomizers;
    }
  1. DefaultRocketMQListenerContainer 添加

    private List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers;
    public List<RocketMQListenerMessageCustomizer> getRocketMQListenerMessageCustomizers() {
        return rocketMQListenerMessageCustomizers;
    }
    
    public void setRocketMQListenerMessageCustomizers(List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers) {
        this.rocketMQListenerMessageCustomizers = rocketMQListenerMessageCustomizers;
    }
  2. ListenerContainerConfiguration 配置 DefaultRocketMQListenerContainer
    container.setRocketMQListenerMessageCustomizers(rocketMQListenerMessageCustomizers);
  3. 处理消息的地方添加调用
    private void handleMessage(
        MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
        if (rocketMQListenerMessageCustomizers != null) {
            for (RocketMQListenerMessageCustomizer customizer : rocketMQListenerMessageCustomizers) {
                customizer.customize(messageExt);
            }
        }
       // ...... other code
    }
  4. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?
texousliu commented 1 year ago

我觉得这是一个比较重要的功能,在获取消息之后能够全局拦截并预先处理。比如说:需要全链路追踪的时候,可以通过这个扩展拦截器处理额外的信息到 context 中

texousliu commented 1 year ago

因为我在 starter 的源代码中没有看到相关的扩展接口,如果需要强制实现的话需要重写 ListenerContainerConfiguration,我认为这是不太友好的方式

mufengCc commented 1 year ago

写的不错,很有想法。

个人认为,作者没有实现扩展接口,可能另有考虑。

另外,如果要针对rocketMQ实现链路追踪,需要生产者在发送消息时,通过properties增加对应值。在消费者端,使用aop进行处理,也很方便

下面是我的代码示例

`@Slf4j @Aspect @Component public class TraceAspect {

private static final String TRACE_ID = "traceId";

@SneakyThrows
@Around("@annotation(mqTraceID)")
public Object before2(ProceedingJoinPoint joinPoint, MqTraceID mqTraceID) {

    Object[] args = joinPoint.getArgs();

    for (Object arg : args) {
        if (arg instanceof MessageExt messageExt) {
            Map<String, String> properties = messageExt.getProperties();
            String traceId = properties.get(TRACE_ID);
            if (StringUtils.isBlank(traceId)) {
                traceId = generateTraceId();
            }
            MDC.put(TRACE_ID, traceId);
            break;
        }
    }

    try {
        return joinPoint.proceed();
    } finally {
        MDC.clear();
    }
}

public static String generateTraceId() {
    return UUID.randomUUID().toString().replace("-", "").toLowerCase();
}

}`

消费者

Component @RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer-group") public class BuyRenewCloudDiskMq implements RocketMQListener{ @MqTraceID @SneakyThrows @Override public void onMessage(MessageExt messageExt) {

}

}

hutaogit commented 1 year ago

因为我在 starter 的源代码中没有看到相关的扩展接口,如果需要强制实现的话需要重写 ListenerContainerConfiguration,我认为这是不太友好的方式

太需要这个了,不然就只能自己定制处理

987472953 commented 12 months ago

可以写个父类,实现RocketMQListene\<MessageExt>,可以自定义消费逻辑

texousliu commented 11 months ago

ocketMQ实现链路追踪,需要生产者在发送消息时,通过properties增加对应值。在消费者端,使

这样就需要修改旧代码了,如果从源头可扩展支持,那么我可以不用动旧代码

texousliu commented 11 months ago

可以写个父类,实现RocketMQListene,可以自定义消费逻辑

旧代码需要改造,新代码需要约束开发者行为,如果不知情的同事还是回使用原生注解

leizhiyuan commented 8 months ago

ref https://github.com/apache/rocketmq-spring/issues/616

只要这个方法改成public,就可以通过切面DefaultRocketMQListenerContainer 来做统一的处理了。