apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.19k stars 11.67k forks source link

Reduce election time #3747

Closed Cczzzz closed 1 year ago

Cczzzz commented 2 years ago

I try to reduce raft election time,After I modified the heartbeat timeout, I found that. under low tps ,Election time is very fast, about 2s。but in high tps ,Election time is slow。

I looked at the code and found that time wasted in , messageStore.dispatchBehindBytes() == 0

This is waiting for the construction of the logical queue,Will it make a difference if I don't wait, is it necessary?

@Override public void handle(long term, MemberState.Role role) {
    Runnable runnable = new Runnable() {
        @Override public void run() {
            long start = System.currentTimeMillis();
            try {
                boolean succ = true;
                log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
                switch (role) {
                    case CANDIDATE:
                        if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
                            brokerController.changeToSlave(dLedgerCommitLog.getId());
                        }
                        break;
                    case FOLLOWER:
                        brokerController.changeToSlave(dLedgerCommitLog.getId());
                        break;
                    case LEADER:
                        while (true) {
                            if (!dLegerServer.getMemberState().isLeader()) {
                                succ = false;
                                break;
                            }
                            if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
                                break;
                            }
                            if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
                            ) {
                                //&& messageStore.dispatchBehindBytes() == 0) { // is this necessary?
                                break;
                            }
                            Thread.sleep(100);
                        }
                        if (succ) {
                            messageStore.recoverTopicQueueTable();
                            brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
                        }
                        break;
                    default:
                        break;
                }
                log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
            } catch (Throwable t) {
                log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
            }
        }
    };
    executorService.submit(runnable);
}
Cczzzz commented 2 years ago

in DLedgerRoleChangeHandler 77

guyinyou commented 2 years ago

This is how I understand it. If a slave node is elected as a master, the consumerqueue position is smaller than the previous master node, which may bring some consistency problems, such as "repeatable read"

Cczzzz commented 2 years ago

This is how I understand it. If a slave node is elected as a master, the consumerqueue position is smaller than the previous master node, which may bring some consistency problems, such as "repeatable read" I think Repeat consumption can be tolerated.After all, it cannot be avoided by itself, such as asynchronous ack consumption progress

guyinyou commented 2 years ago

I am not mean "repeated consumption", but "repeatable read". For example, if a user consumes a message with offset of 2, and then the maximum offset is 1 after the active/standby switchover, at this time, the message with offset of 2 will be inconsistently pulled.

Cczzzz commented 2 years ago

I am not mean "repeated consumption", but "repeatable read". For example, if a user consumes a message with offset of 2, and then the maximum offset is 1 after the active/standby switchover, at this time, the message with offset of 2 will be inconsistently pulled.

The ultimate effect is repeated consumption ,Is that right?

guyinyou commented 2 years ago

The key point is that it may lead to consistency problems, not repeated consumption. Repeated consumption is allowed in rocketmq

hzh0425 commented 2 years ago

例如对于 raft 的 leader transfer oldLeader 必须等待 newLeader 追上自己, 才能实现 leader 的切换, 否则的日志会出现丢失的情况 例如: oldLeader: 1 2 3 4 5 6 newLeader: 1 2 3 4 如果直接实现 NewLeader 切换, 那么 5 6 两条消息就会丢失, 即使其他的 follower 包含 5 和 6 的消息, 也会被迫和 newLeader 保持一致

Cczzzz commented 2 years ago

例如对于 raft 的 leader transfer oldLeader 必须等待 newLeader 追上自己, 才能实现 leader 的切换, 否则的日志会出现丢失的情况 例如: oldLeader: 1 2 3 4 5 6 newLeader: 1 2 3 4 如果直接实现 NewLeader 切换, 那么 5 6 两条消息就会丢失, 即使其他的 follower 包含 5 和 6 的消息, 也会被迫和 newLeader 保持一致

并不是raft 日志,raft 日志由raft协议来保障了,我提出的是raft选举结束后到上报nameserver之间的一个环节,等待逻辑队列构建(consmerqueue )的延迟变成0

dongeforever commented 2 years ago

If the new leader has a smaller offset of the queue, it causes an "out of range" problem, which will make the client reset the consumed offsets to the earliest or the latest.

guyinyou commented 2 years ago

If the new leader has a smaller offset of the queue, it causes an "out of range" problem, which will make the client reset the consumed offsets to the earliest or the latest.

This may be @Cczzzz repeated consumption. If it will only lead to repeated consumption, it may be possible to sacrifice this part slightly. For example, the waiting time-out for building CQ is 30s

Cczzzz commented 2 years ago

If the new leader has a smaller offset of the queue, it causes an "out of range" problem, which will make the client reset the consumed offsets to the earliest or the latest.

This may be @Cczzzz repeated consumption. If it will only lead to repeated consumption, it may be possible to sacrifice this part slightly. For example, the waiting time-out for building CQ is 30s

I want to keep the election time as short as possible, is it okay to skip this part

dongeforever commented 2 years ago

@Cczzzz Out of range is a serious problem, it will cause abnormal repetition from the min offset.

So it is not ok to skip it.

The better way is to improve the dispatch performance.

In fact, the 5.0 will use multi-threads to do the dispatch, which will gain much performance.

Cczzzz commented 2 years ago

@Cczzzz Out of range is a serious problem, it will cause abnormal repetition from the min offset.

So it is not ok to skip it.

The better way is to improve the dispatch performance.

In fact, the 5.0 will use multi-threads to do the dispatch, which will gain much performance.

It will appear that the offset submitted by the client is greater than the consmequeue max offset ?so broker think it's illegal,starter from min offset.

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs.

github-actions[bot] commented 1 year ago

This issue was closed because it has been inactive for 3 days since being marked as stale.