apache / rocketmq-mqtt

Apache rocketmq
https://rocketmq.apache.org/
Apache License 2.0
181 stars 63 forks source link

topic[%LMQ%topic001%r1%] not exist, apply first please 需要给每个通配符创建topic? #197

Closed wljin321 closed 1 year ago

wljin321 commented 1 year ago

kvConfig {"configTable":{"LMQ":{"LMQ_CONNECT_NODES":"localhost","topic001":"topic001/+,topic001/r,topic001/r1,topic001/r2,topic001/r/+","ALL_FIRST_TOPICS":"topic001,topic002"}}}

运行 example

2023-06-08 22:40:10.282 [NettyClientPublicExecutor_2] ERROR o.apache.rocketmq.mqtt.cs.session.loop.QueueCache - 
java.util.concurrent.CompletionException: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 17  DESC: topic[%LMQ%topic001%r1%] not exist, apply first please! 
See https://rocketmq.apache.org/docs/bestPractice/06FAQ for further details. BROKER: 192.168.139.1:10911
    at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
    at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
    at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
    at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1969)
    at org.apache.rocketmq.mqtt.ds.store.LmqQueueStoreManager$3.onException(LmqQueueStoreManager.java:249)
    at org.apache.rocketmq.mqtt.ds.store.LmqQueueStoreManager$4.onException(LmqQueueStoreManager.java:361)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:763)
    at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:320)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 17  DESC: topic[%LMQ%topic001%r1%] not exist, apply first please! 
See https://rocketmq.apache.org/docs/bestPractice/06FAQ for further details. BROKER: 192.168.139.1:10911
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:808)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:171)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:759)
    ... 8 common frames omitted
ChangingFond commented 1 year ago

LMQ requires rocketmq version >= 4.9.3,which version you use?

wljin321 commented 1 year ago

rocketmq-all-5.1.1-bin-release and rocketmq-all-4.9.6-bin-release cause same error with broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

autoCreateTopicEnable=true
enableLmq = true
enableMultiDispatch = true
public static void main(String[] args) throws Exception {
        String group = "group_001";
        DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(group);
        defaultMQPullConsumer.start();
        MQClientInstance mQClientFactory = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory();
        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
        requestHeader.setConsumerGroup(group);
        requestHeader.setTopic("%LMQ%topic001%r1%");
        requestHeader.setQueueId(0);
        requestHeader.setQueueOffset(0L);
        requestHeader.setMaxMsgNums(32);
        requestHeader.setSysFlag(4);
        requestHeader.setCommitOffset(0L);
        requestHeader.setSuspendTimeoutMillis(5000L);
        requestHeader.setSubscription("*");
        requestHeader.setSubVersion(0L);
        requestHeader.setExpressionType("TAG");

        String brokerAddr = "172.25.80.1:10911";
        org.apache.rocketmq.client.consumer.PullResult pullResult =
                mQClientFactory.getMQClientAPIImpl().pullMessage(
                        brokerAddr,
                        requestHeader,
                        3000,
                        CommunicationMode.ASYNC,
                        new PullCallback() {
                            @Override
                            public void onSuccess(PullResult pullResult) {
                                System.out.println(pullResult);
                            }

                            @Override
                            public void onException(Throwable e) {
                                e.printStackTrace();
                            }
                        });

        TimeUnit.SECONDS.sleep(2);
        defaultMQPullConsumer.shutdown();
    }

image

wljin321 commented 1 year ago

start rocketmq broker without -c broker.conf
lmq settings not working

HiSuperPotato commented 1 year ago

你好 我想请问一下 kvConfig {"configTable":{"LMQ":{"LMQ_CONNECT_NODES":"localhost","topic001":"topic001/+,topic001/r,topic001/r1,topic001/r2,topic001/r/+","ALL_FIRST_TOPICS":"topic001,topic002"}}} 这个初始化操作 需要写哪个配置文件上 有指引学习一下么 谢谢

wljin321 commented 1 year ago

不是写文件上的 你启动了rocketmq之后(rocketmq mqtt 依赖rocketmq ,mqtt中会配置rocketmq的nameserv地址)
上面文件的内容 是用
sh mqadmin updateKvConfig -s LMQ -k LMQ_CONNECT_NODES -v {ip1,ip2} -n {namesrv} sh mqadmin updateKvConfig -s LMQ -k {topic} -v {topic/+} -n {namesrv}
sh mqadmin updateKvConfig -s LMQ -k ALL_FIRST_TOPICS -v {topic1,topic2} -n {namesrv} 这些命令去一个一个添加的 {topic} {ip} 写真实的topic 和 ip 支持多个逗号分割的 注意去掉大括号

LMQ LMQ_CONNECT_NODES 等这些变量名是固定的 mqtt会读取这些配置 不能自己乱取名

HiSuperPotato commented 1 year ago

好的 谢谢

imgoddqp commented 1 year ago

请问通配符消息通了吗?我这边测试发现消费者收不到通配符消息,demo 里面的 r1 r2都能正常订阅收到,但是 r/+ 收不到r/wc

wljin321 commented 1 year ago

我测试都正常的 你收不到的话 确认这个配置配对了么? .\mqadmin updateKvConfig -s LMQ -k topic001 -v topic001/+,topic001/r/+ -n localhost:9876 就是 这个 r/+的 映射配置