Open candostyavuz opened 5 months ago
Data synchronization is asynchronous. You will know that it's complete when you can read index on the shard from the new node. I use a function like this to block until the shard is ready after calling StartOnDiskReplica
.
// readIndex blocks until it can read from a shard, indicating that the local replica is up to date.
func (a *Agent) readIndex(ctx context.Context, shardID uint64) (err error) {
var rs *dragonboat.RequestState
for {
rs, err = a.host.ReadIndex(shardID, time.Second)
if err != nil || rs == nil {
a.log.Infof(`[%05x] Error reading shard index: %s: %v`, shardID, a.HostID(), err)
select {
case <-ctx.Done():
return
case <-a.clock.After(time.Second):
}
continue
}
res := <-rs.ResultC()
rs.Release()
if !res.Completed() {
a.log.Infof(`[%05x] Waiting for other nodes`, shardID)
select {
case <-ctx.Done():
return
case <-a.clock.After(time.Second):
}
continue
}
break
}
return
}
Dragonboat v4 has a new WaitReady replica config option that effectively does the same thing.
Thank you so much for the response!
So is it the general flow, can you confirm please?
SyncRequestAddNonVoting
StartReplica
(or with StartOnDiskReplica
in my case) with join=true & initialMemberList is emptyWaitReady
in v4)SyncRequestAddReplica
SyncGetShardMembership
in each existing initial nodes nowAm I correct in the steps above? thanks in advance!
I haven't explored the transition from non-voting to member but that sounds about right. It should be a quick membership transition with no extra state to replicate.
Hey @kevburnsjr ,
I tried your solution and the new node got stuck in the loop where it waits for a reply from ReadIndex
.
Here is how I tested it on a raft cluster with 3 initial members:
Added the new node as non-voting member
One of the initial nodes acknowledged me that non-voting member is added and it waits to connect to that node host
Created a separate docker-compose.yaml
for new node and the new node is started it with following operations:
err = nh.StartOnDiskReplica(initialMembers, !isInitialMember, func(clusterId, nodeId uint64) statemachine.IOnDiskStateMachine {
log.Infof("creating disk state machine for cluster=%d and node=%d", clusterId, nodeId)
if clusterId != s.config.ClusterID {
log.Fatalf("not compatible cluster version %d != %d", clusterId, s.config.ClusterID)
}
return s
}, nhc)
if err != nil {
return errors.Wrapf(err, "failed to start on-disk raft cluster")
}
if err := s.readIndex(context.Background(), s.config.ClusterID); err != nil {
return errors.Wrap(err, "failed to sync new node to the current state of the cluster")
}
Connection is established in networking layer, pinged new node from all other nodes and it was successful. But synchronization was still not happenning.
Here is my readIndex
func:
func (s *Service) readIndex(ctx context.Context, shardID uint64) error { for { rs, err := s.nh.ReadIndex(shardID, time.Second) fmt.Println("ReadIndex - rs: ", rs) if err != nil || rs == nil { log.Infof("[%05x] Error reading shard index: %v", shardID, err) select { case <-ctx.Done(): return ctx.Err() case <-time.After(time.Second): } continue } res := <-rs.ResultC() fmt.Println("readIndex - res: ", res)
rs.Release()
if !res.Completed() {
log.Infof("[%05x] Waiting for other nodes", shardID)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
continue
}
break
}
return nil
}
Here is the response from new node:
```go
readIndex - res: {0 {0 []} [] {0 0} false false}
time="2024-06-09T16:02:12Z" level=info msg="[00000] Waiting for other nodes"
ReadIndex - rs: &{0 0 0 0 367 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7440 0x40010aa000 0x40003e6870 false <nil>}
time="2024-06-09T16:02:14Z" level=info msg="[00000] Waiting for other nodes"
readIndex - res: {0 {0 []} [] {0 0} false false}
ReadIndex - rs: &{0 0 0 0 378 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7380 0x40010aa000 0x40003e6870 false <nil>}
time="2024-06-09T16:02:16Z" level=info msg="[00000] Waiting for other nodes"
readIndex - res: {0 {0 []} [] {0 0} false false}
ReadIndex - rs: &{0 0 0 0 388 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7380 0x40010aa000 0x40003e6870 false <nil>}
readIndex - res: {0 {0 []} [] {0 0} false false}
time="2024-06-09T16:02:18Z" level=info msg="[00000] Waiting for other nodes"
ReadIndex - rs: &{0 0 0 0 399 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7380 0x40010aa000 0x40003e6870 false <nil>}
readIndex - res: {0 {0 []} [] {0 0} false false}
time="2024-06-09T16:02:21Z" level=info msg="[00000] Waiting for other nodes"
This loop kept forever going on with same message.
Anything I might be missing? Do you have any repository that I can check how this process is handled from start to end?
Thanks in advance 🙌
It's hard to tell what might be wrong without a full example. Is the rest of this code in a repository that I can view?
The readIndex
example I gave you was copy/pasted from my project Zongzi which is adds a registry. It is designed to simplify these sorts of multi-node operations. See (*Agent).Start() where all startup scenarios are covered.
@candostyavuz
Once the replica is added to the shard, you can start it on the designated NodeHost, after that everything else is transparent to your application meaning raft logs will be appended to the new nodes and snapshot will be sent if necessary.
In general, there is no extra step required, some would add replica as non-vote first and upgrade it to a full member later, this is extensively discussed in the raft thesis, please read the thesis for more details.
To make sure the recently added replica is working as expected, you can - issue a ReadIndex on that node, if you can successfully complete it then it means all previous logs have been appended and applied onto your new replica. this is a pretty strong guarantee as it also implies that the new replica is recognized by the leader replica, which in turn requires a consensus from all replicas in the shard.
@lni @kevburnsjr Thank you so much for your explanations and examples, I managed to resolve most of my issues. For now, it seems that new node has successfully synchronized and joined the cluster. I haven't tested adding more than 1 node yet but hopefully the process goes smoothly.
I really appreciate it!
@lni @kevburnsjr Sometimes there are issues occurring during the state sync of new added node, do you know the main reason for this?
panic: not initial recovery but snapshot shrunk
2024-06-11 16:29:59.163950 I | rsm: [00000:70460] opened disk SM, index 0
2024-06-11 16:29:59.164460 C | rsm: [00000:70460], ss.OnDiskIndex (106) > s.onDiskInitIndex (0)
panic: [00000:70460], ss.OnDiskIndex (106) > s.onDiskInitIndex (0)
goroutine 423 [running]:
github.com/lni/goutils/logutil/capnslog.(*PackageLogger).Panicf(0x1400128c060, {0x1039581df?, 0x10?}, {0x14001412630?, 0x14000591070?, 0x14000054dd0?})
/Users/candostyavuz/go/pkg/mod/github.com/lni/goutils@v1.4.0/logutil/capnslog/pkg_logger.go:88 +0xb4
github.com/lni/dragonboat/v4/logger.(*capnsLog).Panicf(0x14000591050?, {0x1039581df?, 0x10?}, {0x14001412630?, 0x103cefc01?, 0x140003e0098?})
/Users/candostyavuz/go/pkg/mod/github.com/lni/dragonboat/v4@v4.0.0-20231222133740-1d6e2d76cd57/logger/capnslogger.go:74 +0x28
github.com/lni/dragonboat/v4/logger.(*dragonboatLogger).Panicf(0x140003e00b0?, {0x1039581df, 0x30}, {0x14001412630, 0x3, 0x3})
/Users/candostyavuz/go/pkg/mod/github.com/lni/dragonboat/v4@v4.0.0-20231222133740-1d6e2d76cd57/logger/logger.go:135 +0x54
github.com/lni/dragonboat/v4/internal/rsm.(*StateMachine).checkPartialSnapshotApplyOnDiskSM(0x14000572400, {{0x140001f4d20, 0xd1}, 0x0, 0x6a, 0x2, {0x65, 0x14001412390, 0x0, 0x0, ...}, ...}, ...)
or sometimes:
panic: invalid commitTo index 105, lastIndex() 0
I am attempting to add a new node into an existing Raft cluster to increase the number of nodes in the network. My process involves using Dragonboat's functions like
SyncGetShardMembership
,SyncRequestAddNonVoting
, andSyncRequestAddReplica
. However, I'm facing issues with correctly syncing and propagating the new node information across the existing cluster. I see no information about this very important concept in docs, examples or in discussions. So, I really be grateful if you can provide me a flow that allow me to dynamically add / remove nodes in a running Raft network.SyncRequestAddReplica
and then start the node with related Dragonboat start method, does it handle state sync for the new node automatically or else what should I do to ensure the new node successfully synchronized with the network and connect to existing leader?Current Approach and Problems
SyncGetShardMembership
to fetch the current members of the cluster.SyncRequestAddNonVoting
. The nodeID and raftAddress for the new node are derived from our internal configurations and passed appropriately. In this scenario membership config has not been updated whenSyncGetShardMembership
is called and logged again afterwards.SyncRequestAddReplica
directly, the membership is updated and new node info is added but only one node, which probably processed that operation. How should I propagate this updated member list to the other nodes? Isn'tSyncRequestAddReplica
handling this consensus automatically?In both scenarios, when starting new node with
join
flag = true and initial members list as empty by usingStartOnDiskReplica
function, the new node has absolutely no info about who the leader is or it can't fetch the state info about existing network. It feels like there's no automatic synchronization mechanism when we register & start the new node.Here are some function I am using to implement the logic I described above:
Leader logic which runs in go routine with time ticker:
To be more specific, the new node gives
can't find leader in config, that's not normal
error after registered & started with steps described above...I appreciate any guidance or insights into how to correctly manage node additions in a Raft cluster using Dragonboat, ensuring consistency and stability throughout the cluster. Thanks in advance 🙌