NewLifeX / NewLife.RocketMQ

纯托管轻量级RocketMQ客户端,支持发布消息、消费消息、负载均衡等核心功能!
MIT License
212 stars 79 forks source link

Producer和consumer如何使用阿里云rocketMQ #35

Closed liangxiao1714 closed 2 years ago

liangxiao1714 commented 3 years ago

刚接触rocketMQ不太懂这个配置,看到这个类库,但是没有使用方面的文档说明

            NewLife.RocketMQ.Producer producer = new NewLife.RocketMQ.Producer
            {
                AccessKey = AccessKeyId,
                SecretKey = AccessKeySecret,
                Server = EndPoint,
                //此处为阿里云控制台rocketMQ实例下的接入点,分为TCP(只有内网Endpoint),HTTP(公网和内网为不同Endpoint)
                //是否需要NameServerAddress配置
                Topic = Topic
            };
            producer.Start();
            producer.Publish(new Message
            {
                Keys = "TheKey",
                BodyString = "TheMessage." + DateTime.Now,
                Tags = "TestTag"
            });
            producer.Stop();
NewLife.RocketMQ.Consumer consumer = new NewLife.RocketMQ.Consumer();
            consumer.Configure(new NewLife.RocketMQ.MqSetting()
            {
                AccessKey = AccessKeyId,
                SecretKey = AccessKeySecret,
                Group = Group,
                Server = EndPoint,
                Topic = Topic
            });
            consumer.OnConsume = (mq, ext) =>
            {
                Console.WriteLine(mq.BrokerName);
                return true;
            };
            consumer.Start();
            Task.Delay(TimeSpan.FromMinutes(10));
            consumer.Stop();
liangxiao1714 commented 3 years ago

看到https://github.com/NewLifeX/NewLife.RocketMQ/issues/29 但是好像没有明确的解决了该问题,使用阿里云控制台rocketMQ实例下的接入点和http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet都无法正常运行

nnhy commented 3 years ago

RocketMQ本身只需要NameServer即可,但是阿里云多了个Server,是一个http地址,通过这个地址结合appid,可以获得nameserver

nnhy commented 3 years ago

阿里云Server是http地址,通过http获得NameServer后,走RocketMQ的tcp链路,做topic查找等操作

消息队列NewLife.RocketMQ · 语雀 https://www.yuque.com/smartstone/nx/rocketmq

liangxiao1714 commented 3 years ago

RocketMQ本身只需要NameServer即可,但是阿里云多了个Server,是一个http地址,通过这个地址结合appid,可以获得nameserver

另外当Server换成官方控制台http接入点时,Exception: One or more errors occurred. (400 Bad Request),暂时看起来官方这个endpoint应该只是用来调用官方SDK时使用

var producer = new Producer
            {
                Topic = Topic,
                Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet",
                AccessKey = AccessKeyId,
                SecretKey = AccessKeySecret
            };
            producer.Log = XTrace.Log;
            try
            {
                producer.Start();

                producer.Publish("message", "tag", "key");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
            finally
            {
                producer.Stop();
            }
nnhy commented 2 years ago

http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet

这个地址返回的就是NameServer地址

Guma-Ark commented 2 years ago

调用阿里云RocketMQ时NewLife的设置如下 var mq = new Producer { //这里不能只设置阿里云控制台看到的Topic,需要加上实例ID,百分号隔开 Topic = $"{实例ID}%{Topic}", NameServerAddress ="Tcp接入点(公网)" };

nnhy commented 2 years ago

阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]。 fix #24 fix#35 … · NewLifeX/NewLife.RocketMQ@33e58a6 https://github.com/NewLifeX/NewLife.RocketMQ/commit/33e58a6b727f88482b159f5e532864a6591b2a5e