apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.19k stars 11.67k forks source link

MQClientInstance has a risk of blocking during persistAllConsumerOffset() #3143

Closed ozw999 closed 2 years ago

ozw999 commented 3 years ago

BUG REPORT

  1. Please describe the issue you observed: I found a problem : When a network exception occurs on my cluster(2m-2s), the task( updateTopicRouteInfoFromNameServer() ) was not performed on time. Why MQClientInstance used a Executors.newSingleThreadScheduledExecutor() to do lots of things? such as

    updateTopicRouteInfoFromNameServer();
    sendHeartbeatToAllBrokerWithLock();
    persistAllConsumerOffset();

    Through my observation, when a network exception occurred while executing persistAllConsumerOffset() :

    
    @Override
    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }
    
    if (findBrokerResult != null) {
        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setCommitOffset(offset);
    
        if (isOneway) {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

    }

    
    At this moment, `this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());` will not be null,because used single-thread, task `updateTopicRouteInfoFromNameServer();` is waiting.
    In fact  the `findBrokerResult` doesn't exist anymore, every queue will cost 5 seconds in `findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);`. 
    In the meanwhile, consumer cannot execute  the `rebalance` correctly, because topicRouteInfo is wrong.
  2. Please tell us about your environment: RocketMQ version is 4.4.0.

  3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc): Why MQClientInstance used a Executors.newSingleThreadScheduledExecutor() to do lots of things? Whether multiple threads can be used to execute them?

zongtanghu commented 3 years ago

So, and what's your advices ?

ozw999 commented 3 years ago

So, and what's your advices ?

When you want to speed up disaster recovery switching,you will find that the problem is holding you back. Your MQ will not work properly for two minutes. My suggestion, is used another thread to do persistAllConsumerOffset() But I need to find out why the developers designed it in this way.

caigy commented 3 years ago

There're 5 tasks in org.apache.rocketmq.client.impl.factory.MQClientInstance#scheduledExecutorService and they're executed in a 'sequential' way just as described in question. After reading related code, I found that those tasks would read and write some shared fields, including but not limited to:

More precise work should be spent on how to make them work concurrently in a safe and efficient way.

ozw999 commented 3 years ago

I think that maybe my English is not good enough to express myself fully. Forgive me for using Chinese next.

@caigy

  • org.apache.rocketmq.client.impl.factory.MQClientInstance#brokerAddrTable
  • org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteTable

以上两个map是 ConcurrentMap ,应该不存在并发问题。

  • persistAllConsumerOffset() also calling updateTopicRouteInfoFromNameServer()

我想你指的是:

    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

达成这个条件的前提时当前brokerAddrTable内的数据是正确的。比如在执行persistAllConsumerOffset()时若是broker-b正好出现网络故障,此时由于没有去更细brokerAddrTable,findBrokerResult仍然能取到值,从而不会进入if条件。

我现在遇到的问题是,我在使用双主双从的RocketMQ集群做容灾测试的时候发现,在拔掉一台服务器的网线后(A主和B从),MQ大约需要两分钟以后才能够恢复正常的生产和消费。这在企业级应用中是无法容忍的,因此我尝试缩短了一些源码中的超时判断时间,例如:

但是接踵而来的问题是我发现容灾恢复的时间忽快忽慢,于是我增加了一些调试信息找到了导致MQ恢复慢的问题主要在于: MQClientInstance里的单线程池在执行定时任务时如果正好执行到persistAllConsumerOffset()时发生了服务器断网,由于我有大量的queue他需要一个一个的去持久化,会导致其他定时任务,例如updateTopicRouteInfoFromNameServer()在很长的时间内(大约2分钟)不会去执行。在此期间consumer的rebalance无法正确执行,从而不能恢复消费。 源码中的这种执行方式很大程度上限制了RocketMQ的容灾恢复速度。

所以,如果MQClientInstance中的这几个定时任务能够拆分执行的话我建议把persistAllConsumerOffset()拆出来单独执行。

lizhimins commented 3 years ago

从代码看确实会这样,单次位点提交失败的造成的最坏结果是少量消息冲突,如果使用异步的方式并发提交位点是不是不会有这个问题

ozw999 commented 3 years ago

是的。所有的消费持久化最后都会依次通过 org.apache.rocketmq.remoting.netty#invokeSync() 去执行,且超时时间是5s。这将造成非常恐怖的阻塞。

    if (isOneway) {
        this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
        findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
            findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }

改用异步调用似乎是一个更好的解决方案。 org.apache.rocketmq.remoting.netty#invokeAsync() @duhenglucky 我觉得这不仅仅是一个”question“而已呀

github-actions[bot] commented 2 years ago

This issue is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs.

github-actions[bot] commented 2 years ago

This issue was closed because it has been inactive for 3 days since being marked as stale.

Slideee commented 1 year ago

这应该是一个必现的问题,用BenchmarkTest(1024条队列)去压测的同时,让Broker挂掉在恢复,客户端至少要几个小时才能恢复可用。