NewLifeX / NewLife.RocketMQ

纯托管轻量级RocketMQ客户端,支持发布消息、消费消息、负载均衡等核心功能!
MIT License
212 stars 79 forks source link

阿里云RocketMQ升级后无法订阅到部分消息 #73

Closed zhangruiwei closed 3 weeks ago

zhangruiwei commented 10 months ago

使用版本:v2.4.2023.0803 阿里云每次升级服务后,存在某个BornHost(例119.23.158.52:16654)上的消息始终无法订阅到,服务重启后这部分消息就成功订阅到了。并且在阿里云消费组中查看客户端连接信息,存在一个或多个客户端交替连接的情况(一会一个连接,一会两个连接)。

升级过程中部分日志信息如下: 22:51:49.410 2 N - NewLife.XException: [cn-qingdao-public-share-06-3]集群所有地址[1]连接失败! at NewLife.RocketMQ.ClusterClient.EnsureCreate() at NewLife.RocketMQ.ClusterClient.SendAsync(Command cmd, Boolean waitResult, CancellationToken cancellationToken) at NewLife.RocketMQ.ClusterClient.InvokeAsync(RequestCode request, Object body, Object extFields, Boolean ignoreError, CancellationToken cancellationToken) at NewLife.RocketMQ.Consumer.UpdateOffset(MessageQueue mq, Int64 commitOffset, CancellationToken cancellationToken) 22:51:49.411 2 N - NewLife.XException: [cn-qingdao-public-share-06-3]集群所有地址[1]连接失败! at NewLife.RocketMQ.ClusterClient.EnsureCreate() at NewLife.RocketMQ.ClusterClient.SendAsync(Command cmd, Boolean waitResult, CancellationToken cancellationToken) at NewLife.RocketMQ.ClusterClient.InvokeAsync(RequestCode request, Object body, Object extFields, Boolean ignoreError, CancellationToken cancellationToken) at NewLife.RocketMQ.Consumer.UpdateOffset(MessageQueue mq, Int64 commitOffset, CancellationToken cancellationToken) 22:51:49.411 2 N - NewLife.XException: [cn-qingdao-public-share-06-3]集群所有地址[1]连接失败! at NewLife.RocketMQ.ClusterClient.EnsureCreate() at NewLife.RocketMQ.ClusterClient.SendAsync(Command cmd, Boolean waitResult, CancellationToken cancellationToken) at NewLife.RocketMQ.ClusterClient.InvokeAsync(RequestCode request, Object body, Object extFields, Boolean ignoreError, CancellationToken cancellationToken) at NewLife.RocketMQ.Consumer.UpdateOffset(MessageQueue mq, Int64 commitOffset, CancellationToken cancellationToken) 22:52:05.068 22 Y T System.Threading.Tasks.TaskCanceledException: A task was canceled. at NewLife.Net.SessionBase.SendMessageAsync(Object message, CancellationToken cancellationToken) at NewLife.RocketMQ.ClusterClient.SendAsync(Command cmd, Boolean waitResult, CancellationToken cancellationToken) at NewLife.RocketMQ.ClusterClient.InvokeAsync(RequestCode request, Object body, Object extFields, Boolean ignoreError, CancellationToken cancellationToken) at NewLife.RocketMQ.Consumer.InitOffsetAsync(CancellationToken cancellationToken) at NewLife.RocketMQ.Consumer.Rebalance() 23:00:57.518 45 Y T NewLife.XException: [cn-qingdao-public-share-06-0]集群所有地址[1]连接失败! at NewLife.RocketMQ.ClusterClient.EnsureCreate() at NewLife.RocketMQ.ClusterClient.OnStart() at NewLife.RocketMQ.BrokerClient.OnStart() at NewLife.RocketMQ.ClusterClient.Start() at NewLife.RocketMQ.Client.MqBase.GetBroker(String name) at NewLife.RocketMQ.Consumer.InitOffsetAsync(CancellationToken cancellationToken) at NewLife.RocketMQ.Consumer.Rebalance() at NewLife.RocketMQ.Consumer.CheckGroup(Object state)

zhangruiwei commented 10 months ago

A0F052A7-73DC-47c2-A271-3269EE3DF513

nnhy commented 10 months ago

网络不好,A客户端跟服务器broker断开连接。此时整个消费组会重新负载均衡,其它消费者会抢这个queue。 如果频繁断开连接,可能导致频繁负载均衡,搞乱消费分组。

nnhy commented 6 months ago

阿里云 rocketmq 的网络结构比较特殊,公网使用会有一层网关。建议优先使用VPC内网访问

nnhy commented 3 weeks ago

经测试支持 Apache RocketMQ v5.3 !

v2.6.2024.1004 支持RocketMQ v5.3,在公网测试通过。默认内网broker地址替换为公网地址。https://github.com/NewLifeX/NewLife.RocketMQ/commit/006817ea918d48c83b2fb0f1318a8a3b339f9528