apache / kvrocks-controller

Apache Kvrocks Controller is a cluster management tool for Apache Kvrocks.
Apache License 2.0
84 stars 42 forks source link

Add support of the embedded storage #183

Open ptyin opened 3 months ago

ptyin commented 3 months ago


The kvrocks-controller previously depended on external storage systems such as Apache ZooKeeper or ETCD for metadata management and leader election. This reliance introduces increased operational complexity and user burden. This proposal aims to alleviate these issues by integrating an embedded storage solution.

Implementation Overview

The detailed design can be reviewed in the proposal document.

Key components include:

Embedded System

The Embedded struct houses the application logic to manipulate the metadata:

type Embedded struct {
    kv          map[string][]byte
    kvMu        sync.RWMutex
    snapshotter *snap.Snapshotter
    node        *raftNode
    myID        string
    PeerIDs     []string
    quitCh      chan struct{}
    leaderChangeCh <-chan bool
    proposeCh   chan string
    confChangeCh chan raftpb.ConfChange

The kv map serves as the primary data structure, akin to the functionality found in etcd or ZooKeeper. Here's how data operations are handled:

func (e *Embedded) Propose(k string, v []byte) {
    var buf strings.Builder
    if err := gob.NewEncoder(&buf).Encode(persistence.Entry{Key: k, Value: v}); err != nil {
       logger.Get().With(zap.Error(err)).Error("Failed to propose changes")
    e.proposeCh <- buf.String()

A background process consistently reads from the commitCh channel, which receives commits published by raftNode.

for c := range commitCh {
    if c == nil {
       snapshot, err := e.loadSnapshot()
       if err != nil {
          logger.Get().With(zap.Error(err)).Error("Failed to load snapshot")
       if snapshot != nil {
          logger.Get().Sugar().Infof("Loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
          if err := e.recoverFromSnapshot(snapshot.Data); err != nil {
             logger.Get().With(zap.Error(err)).Error("Failed to recover snapshot")

    for _, data := range c.data {
       var entry persistence.Entry
       dec := gob.NewDecoder(bytes.NewBufferString(data))
       if err := dec.Decode(&entry); err != nil {
          logger.Get().With(zap.Error(err)).Error("Failed to decode message")
       if entry.Value == nil {
          delete(e.kv, entry.Key)
       } else {
          e.kv[entry.Key] = entry.Value

Communication between the Embedded system and raftNode occurs via proposeCh and commitCh.

Raft Node

raftNode is explored in raft.go. It initializes its state when created:

if !fileutil.Exist(rc.snapDir) {
    if err := os.Mkdir(rc.snapDir, 0750); err != nil {
       logger.Get().With(zap.Error(err)).Fatal("Cannot create directory for snapshot")
rc.snapshotter = snap.New(logger.Get(), rc.snapDir)
oldwal := wal.Exist(rc.walDir)
rc.wal = rc.replayWAL()

Recovery of state occurs prior to regular operations, restoring from snapshots and replaying WAL for uncommitted entries.

Establishment of network communication with peer nodes follows recovery:

rc.transport = &rafthttp.Transport{
    Logger:    logger.Get(),
    ID:        types.ID(rc.id),
    ClusterID: 0x1000,
    Raft:      rc,
    ServerStats: stats.NewServerStats("", ""),
    LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),
    ErrorC:    make(chan error),
    DialRetryFrequency: 1,

if err := rc.transport.Start(); err != nil {
    logger.Get().With(zap.Error(err)).Panic("Failed to start raft HTTP server")

for i := range rc.peers {
    if i+1 != rc.id {
       rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})

go rc.serveRaft()

With go rc.serveChannels(), the system enters the critical event loop, primarily divided into two main goroutines:

Receiving proposals from proposeCh This goroutine is responsible for handling incoming proposals

go func() {
    confChangeCount := uint64(0)

    for rc.proposeC != nil && rc.confChangeC != nil {
       select {
       case prop, ok := <-rc.proposeC:
          if !ok {
             rc.proposeC = nil
          } else {
             // blocks until accepted by raft state machine
             rc.node.Propose(context.TODO(), []byte(prop))

       case cc, ok := <-rc.confChangeC:
          if !ok {
             rc.confChangeC = nil
          } else {
             cc.ID = confChangeCount
             rc.node.ProposeConfChange(context.TODO(), cc)
    // client closed channel; shutdown raft if not already

Event Loop on Raft State Machine Updates This loop processes state machine updates and manages storage interactions

for {
    select {
    case <-ticker.C:

    // store raft entries to wal, then publish over commit channel
    case rd := <-rc.node.Ready():
       if rd.SoftState != nil {
          isLeader := rd.RaftState == raft.StateLeader
          if rc.isLeader.CAS(!isLeader, isLeader) {
             rc.leaderChangeCh <- isLeader
       // Must save the snapshot file and WAL snapshot entry before saving any other entries
       // or hardstate to ensure that recovery after a snapshot restore is possible.
       if !raft.IsEmptySnap(rd.Snapshot) {
       rc.wal.Save(rd.HardState, rd.Entries)
       // Load snapshot to memory
       if !raft.IsEmptySnap(rd.Snapshot) {
          // Notify Embedded to load snapshot
       // Append entries
       // Send some metadata required by the etcd/raft framework
       // Send commits to Embedded
       applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
       if !ok {

    case err := <-rc.transport.ErrorC:

    case <-rc.stopCh:
caicancai commented 1 week ago

I think this should be a plug-in. Users can choose etcd, zookeeper, raft, or even mysql and other external storage. Reference https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins

ptyin commented 1 week ago

I think this should be a plug-in. Users can choose etcd, zookeeper, raft, or even mysql and other external storage. Reference https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins

I see. However, dolphinscheduler use pom.xml profile to define which code should be included and excluded. If we surely want to implement this behaviour, we probably need to modify Makefile and use Go build tags.

@git-hulk What do you think?

git-hulk commented 1 week ago

I think this should be a plug-in. Users can choose etcd, zookeeper, raft, or even mysql and other external storage. Reference https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins

I see. However, dolphinscheduler use pom.xml profile to define which code should be included and excluded. If we surely want to implement this behaviour, we probably need to modify Makefile and use Go build tags.

@git-hulk What do you think?

It should be fine to include plugins while building and users can choose which engine to use via the configuration file. But from my personal perspective, I prefer encouraging users to use raft + embedded storage instead of the external service if it's ready.

caicancai commented 1 week ago

@ptyin This requirement does not have to be completed in this PR. Maybe I will send a proposal after I write a complete plan. Thank you for your suggestion.