apache / rocketmq-flink

RocketMQ integration for Apache Flink. This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job.
https://rocketmq.apache.org/
Apache License 2.0
139 stars 88 forks source link

rocketmq-flink only can consume from master? #90

Closed daigoopautoy closed 11 months ago

daigoopautoy commented 1 year ago

I see the steps of RocketMQSource are:

  1. long offset = getMessageQueueOffset(mq);
  2. offset = consumer.searchOffset(mq); (maxOffset, minOffset as well)
  3. org.apache.rocketmq.client.impl.MQAdminImpl#searchOffset
  4. String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

In findBrokerAddressInPublish , will only get master address or null.

    public String findBrokerAddressInPublish(final String brokerName) {
        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            return map.get(MixAll.MASTER_ID);
        }

        return null;
    }

When master is down, the result of step4 is null, and then step3 will throw MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null), and then RocketMQSource will crash.

So, if master down, not flink consumer will consume from slave, flink consumer will crash, is this properly?