rocketmq / rocketmq-spring-boot-starter

Help developers quickly integrate RocketMQ in Spring Boot. Support the Spring Message specification to facilitate developers to quickly switch from other MQ to RocketMQ.
https://www.rocketmq.org
Apache License 2.0
77 stars 22 forks source link

tag不能为* #7

Open humingzhang opened 5 years ago

humingzhang commented 5 years ago

消费端和生产端必须tag,否则收不到消息

no-today commented 3 years ago

这个仓库有坑,消费者无法监听 Tag '*'

RocketMQListenerMethodAdapter.java

@Override
public void onMessage(E message, MessageContext context) throws ConsumeException {
    if (logger.isDebugEnabled()) {
        logger.debug("received message:{}", message);
    }
    String tag = context.getMessageExt().getTags();
    // 这边入参 Tag 为 'XXXX', 无法匹配到 '*'
    Method method = this.subscriptionGroup.getMethod(tag);
    Object delegate = this.subscriptionGroup.getTarget();
    if (method != null) {
        try {
            invoker.invoke(delegate, method, message, context);
        } catch (Exception e) {
            throw new ConsumeException(e);
        }
    } else {
        if (("*").equals(tag.trim())) {
            invoker.invoke(delegate, this.subscriptionGroup.getAllMethods(), message, context);
        } else {
            throw new ConsumeException("No way to find the corresponding tag");
        }
    }
}
no-today commented 3 years ago

修改为

if (("*").equals(tag.trim())) {
    invoker.invoke(delegate, this.subscriptionGroup.getAllMethods(), message, context);
} else {
    Method method_ = Optional.ofNullable(this.subscriptionGroup.getMethod("*"))
            .orElseThrow(() -> new ConsumeException("No way to find the corresponding tag: " + tag));
    try {
        invoker.invoke(delegate, method_, message, context);
    } catch (Exception e) {
        throw new ConsumeException(e);
    }
}