apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.19k stars 11.67k forks source link

Local transaction messages are rollback when slave broker is downtime. #3885

Closed zergduan closed 1 year ago

zergduan commented 2 years ago

BUG REPORT

  1. Please describe the issue you observed:

2主2备 同步复制 + 同步刷盘 当备节点宕机时, 事务消息不遵循本地事务处理逻辑, 被强制 rollback 2主2备 异步复制 + 同步刷盘 当备节点宕机时, 事务消息遵循本地事务处理逻辑

2主2备 同步复制 同步刷盘 rocketmq 4.9.2

4节点关系如下:

broker-a-master broker-a-slave broker-b-master broker-b-slave

使用下列代码,验证事务消息功能

public class TransProducer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer(null,"My-Producer-YYY",null,true,null);
        producer.setNamesrvAddr("10.177.96.111:19876;10.177.96.112:19876");
        TransactionListenerImpl transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        producer.start();
        System.out.println("生产者启动");
        String[] tags = { "TAGA", "TAGB", "TAGC" };
        for (int i = 0; i < 3; i++) {
            Message msg = new Message("TP-E-APP-YYY", tags[i], ("Hello xuzhu" + i).getBytes());
            SendResult result = producer.sendMessageInTransaction(msg, "hello-xuzhu_transaction");
            SendStatus status = result.getSendStatus();
            System.out.println("发送结果:" + result);
            System.out.println("发送结果状态:" + status);
            TimeUnit.SECONDS.sleep(2);
        }
        producer.shutdown();
        System.out.println("生产者结束");
    }
}
public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("正在执行本地事务----");
        if (StringUtils.equals("TAGA", message.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TAGB", message.getTags())) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if (StringUtils.equals("TAGC", message.getTags())) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.UNKNOW;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("消息的Tag:" + messageExt.getTags());
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

场景1: 主从同步复制, 4节点正常情况下,执行上面的脚本,输出如下:

生产者启动 正在执行本地事务---- 发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001E28318B4AAC273CA85360000, offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=1], queueOffset=232] 发送结果状态:SEND_OK 正在执行本地事务---- 发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001E28318B4AAC273CA8D210004, offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=2], queueOffset=233] 发送结果状态:SEND_OK 正在执行本地事务---- 发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001E28318B4AAC273CA94F50008, offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=3], queueOffset=234] 发送结果状态:SEND_OK 生产者结束

在rocketmq-dashboard,通过msgTrace功能,在RMQ_SYS_TRANS_HALF_TOPIC中,根据msgId查看输出结果中,状态及相关信息:

Send Message Info : ( Message Id 7F000001E28318B4AAC273CA85360000 ) TAGA Check Transaction Info : Timestamp TransactionState FromTransactionCheck ClientHost StoreHost 2022-02-23 11:37:32.239 COMMIT_MESSAGE false 10.177.96.117 10.177.96.115:22922

Send Message Info : ( Message Id 7F000001E28318B4AAC273CA8D210004 ) TAGB Check Transaction Info : Timestamp TransactionState FromTransactionCheck ClientHost StoreHost 2022-02-23 11:37:34.244 ROLLBACK_MESSAGE false 10.177.96.117 10.177.96.115:22922

Send Message Info : ( Message Id 7F000001E28318B4AAC273CA94F50008 ) TAGC Check Transaction Info : Timestamp TransactionState FromTransactionCheck ClientHost StoreHost 2022-02-23 11:37:36.247 UNKNOW false 10.177.96.117 10.177.96.115:22922

结论: 符合代码逻辑

场景2: 主从同步复制, 关闭 broker-a-master 和 broker-b-slave 节点, 模拟故障, 验证高可用功能; 执行上面的脚本,输出如下:

生产者启动 发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F000001EA3918B4AAC273ED20440000, offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=1], queueOffset=250] 发送结果状态:SLAVE_NOT_AVAILABLE 发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F000001EA3918B4AAC273ED28330004, offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=2], queueOffset=251] 发送结果状态:SLAVE_NOT_AVAILABLE 发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F000001EA3918B4AAC273ED30070008, offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=3], queueOffset=252] 发送结果状态:SLAVE_NOT_AVAILABLE 生产者结束

在rocketmq-dashboard,通过msgTrace功能,在RMQ_SYS_TRANS_HALF_TOPIC中,根据msgId查看输出结果中,状态及相关信息:

Send Message Info : ( Message Id 7F000001EA3918B4AAC273ED20440000 ) TAGA Check Transaction Info : Timestamp TransactionState FromTransactionCheck ClientHost StoreHost 2022-02-23 12:15:20.161 ROLLBACK_MESSAGE false 10.177.96.117 10.177.96.115:22922

Send Message Info : ( Message Id 7F000001EA3918B4AAC273ED28330004 ) TAGB Timestamp TransactionState FromTransactionCheck ClientHost StoreHost 2022-02-23 12:15:22.167 ROLLBACK_MESSAGE false 10.177.96.117 10.177.96.115:22922

Send Message Info : ( Message Id 7F000001EA3918B4AAC273ED30070008 ) TAGC Check Transaction Info : Timestamp TransactionState FromTransactionCheck ClientHost StoreHost 2022-02-23 12:15:24.171 ROLLBACK_MESSAGE false 10.177.96.117 10.177.96.115:22922

结论: 不符合代码逻辑, 所有消息都被Rollback; 跟踪代码发现本地事务的代码 public LocalTransactionState executeLocalTransaction 并没有被执行, 所有半消息被强制 Rollback

场景3: 主从异步复制(即主节点参数 brokerRole 改为 ASYNC_MASTER) , 关闭 broker-a-master 和 broker-b-slave 节点, 模拟故障, 验证高可用功能; 执行上面的脚本,输出如下:

生产者启动 正在执行本地事务---- 发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001EC5618B4AAC273F5C2B70000, offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=1], queueOffset=253] 发送结果状态:SEND_OK 正在执行本地事务---- 发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001EC5618B4AAC273F5CAA00004, offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=2], queueOffset=254] 发送结果状态:SEND_OK 正在执行本地事务---- 发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001EC5618B4AAC273F5D2730008, offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=3], queueOffset=255] 发送结果状态:SEND_OK 生产者结束

在rocketmq-dashboard,通过msgTrace功能,在RMQ_SYS_TRANS_HALF_TOPIC中,根据msgId查看输出结果中,状态及相关信息:

Send Message Info : ( Message Id 7F000001EC5618B4AAC273F5C2B70000 ) TAGA Check Transaction Info : Timestamp TransactionState FromTransactionCheck ClientHost StoreHost 2022-02-23 12:24:46.030 COMMIT_MESSAGE false 10.177.96.117 10.177.96.115:22922

Send Message Info : ( Message Id 7F000001EC5618B4AAC273F5CAA00004 ) TAGB Check Transaction Info : Timestamp TransactionState FromTransactionCheck ClientHost StoreHost 2022-02-23 12:24:48.035 ROLLBACK_MESSAGE false 10.177.96.117 10.177.96.115:22922

Send Message Info : ( Message Id 7F000001EC5618B4AAC273F5D2730008 ) TAGC Check Transaction Info : Timestamp TransactionState FromTransactionCheck ClientHost StoreHost 2022-02-23 12:24:50.038 UNKNOW false 10.177.96.117 10.177.96.115:22922

结论: 符合代码逻辑

同步复制的主备集群, 当备节点宕机时, 不应影响事务消息的处理逻辑, 不应忽略本地事务处理代码强制将所有消息 rollback

异步复制的主备集群, 当备节点宕机时, 事务消息处理逻辑正常

  1. Please tell us about your environment: Oracle Linux 8.4 openjdk version "1.8.0_322" rocketmq-all-4.9.2-bin-release.zip (https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip) 2主2备 同步复制 + 同步刷盘 当备节点宕机时, 事务消息不遵循本地事务处理逻辑, 被强制 rollback 2主2备 异步复制 + 同步刷盘 当备节点宕机时, 事务消息遵循本地事务处理逻辑

  2. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):

zergduan commented 2 years ago

同步复制主备集群,备节点宕机的情况下,事务消息的发送结果(producer.sendMessageInTransaction.getSendStatus) 为 SLAVE_NOT_AVAILABLE 时,为什么会忽略 TransactionListenerImpl 中的本地事务处理代码,导致所有半消息违背代码逻辑,被强制 Rollback??

panzhi33 commented 2 years ago

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction image 同步复制主备情况下,出现这个SLAVE_NOT_AVAILABLE状态时,说明消息没有发送成功,因此强制回滚。

zergduan commented 2 years ago

感谢您的解答,请问为什么单独对事物消息有此限制?

我在同步复制的主从集群中,当从节点宕机时,普通消息,顺序消息,延时消息都可以正常发布到topic,并不会受到”SLAVE NOT AVALIABLE“的影响。

请问出于什么考虑,要在事物消息的代码中对于”SLAVE_NOT_AVAILEABLE"设置强制回滚呢?

zergduan commented 2 years ago

我认为对于“SLAVE_NOT_AVALIABLE”这个异常的处理应该保持一致,如果普通消息,顺序消费,延时消息都认为“SLAVE_NOT_AVALIABLE”是一个代表消息已经成功发布的异常(即不需要producer重新发送),那么事物消息也应该做同样的处理(即认为消息已经成功发送),而不是特殊的强制rollback这个半消息。

或者是有事物消息有什么特殊的需求,要对“SLAVE NOT AVALIABLE”做特殊处理?(强制Rollback)

panzhi33 commented 2 years ago

只有SEND_OK状态才是表示成功的,非事务消息的其它状态是需要业务自己来处理的。事务消息是因为多了一个消息状态回查,因此sdk就给主动rollback了

ShannonDing commented 2 years ago

In the case of master-slave synchronous replication, MQ believes that this is the highest level of persistence, and requires that the master and slave data be strictly consistent. If the slave broker is downtime, the return result of sending a message is that SLAVE_NOT_AVAILABLE, and the business needs to be based on its own situation to decide to how to proceed it. When the SDK internally processes the return value of a transaction message, MQ strictly handles it according to the logic of failure if it is not SEND_OK by default, ensuring that the master and slave brokers are in a strictly consistent state, and directly rolling back the local transaction messages.

any other suggestions for handling here?

zergduan commented 2 years ago

ShannonDing & panzhi33, 非常感谢两位的解答

我同意您的说法:”对于同步复制来说,SLAVE_NOT_AVALIABLE 被定义为消息发布失败“,这点可以通过消息追踪确认(对于sendStatus 为 SLAVE_NOT_AVALIABLE 的消息,消息追踪中标识为 status: failed。

但是我依然有疑问为什么 RocketMQ 对于 status: failed (sendStatus: SLAVE_NOT_AVALIABLE) 的不同消息,处理方法不一致?

我测试了:普通消息(同步发送,异步发送),顺序消息,定时消息,这些消息在 status: failed (sendStatus: SLAVE_NOT_AVALIABLE) 时,都可以被正常消费。 也就是说当生产者收到 status: failed (sendStatus: SLAVE_NOT_AVALIABLE) 并不需要做任何特殊处理,消息依然可以成功消费。

而对事物消息,当 status: failed (sendStatus: SLAVE_NOT_AVALIABLE)发生 时,消息因为被强制Rollback,导致无法从 RMQ_SYS_TRANS_HALF_TOPIC 复制到指定的 Topic 中,所以无法被正常消费。

所以我依然存在以下2个问题:

  1. 如果如您所说:”当消费者收到 status: failed (sendStatus: SLAVE_NOT_AVALIABLE) 时,应该自行处理“;那么岂不是要消费者收到 sendStatus: SLAVE_NOT_AVALIABLE时,在针对发送消息的类型(判断是否时事物消息)使用不一样的处理方式?为什么要设计这种不一致的处理方式?(对于不同消息类型,处理方式不一致)

  2. 当收到 status: failed (sendStatus: SLAVE_NOT_AVALIABLE) 时,消费者应该做什么样的处理呢? 我测试了重发消息,结果在 2M-2S 的集群中,当其中1个Slave节点宕机时,消费者重发消息,并不能确保将消息发送到完整的M-S中,而依然会依照负载均衡策略,轮询发送(即有可能多次都发送到Slave宕机的M-S中)。

谢谢

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs.

github-actions[bot] commented 1 year ago

This issue was closed because it has been inactive for 3 days since being marked as stale.