paust-team / pirius

Stream Delivery Network
GNU General Public License v3.0
8 stars 3 forks source link

Design Coordinator interface and implement zookeeper-implementation #166

Closed elon0823 closed 2 years ago

elon0823 commented 2 years ago

related to #165, merging #164 must be preceded

Design overview

In ca894e46045bb637c045604fd3da047a8b7d81a4, I designed Coordinator interface as below

type Coordinator interface {
    IsClosed() bool
    Close()
    Create(path string, value []byte) CreateOperation
    Set(path string, value []byte) SetOperation
    Get(path string) GetOperation
    Delete(paths []string) DeleteOperation
    Children(path string) ChildrenOperation
    Lock(path string, do func()) LockOperation
    OptimisticUpdate(path string, update func(current []byte) []byte) OptimisticUpdateOperation
}

type CreateOperation interface {
    WithLock(string) CreateOperation
    AsEphemeral() CreateOperation
    AsSequential() CreateOperation
    Run() error
}

type SetOperation interface {
    WithLock(string) SetOperation
    Run() error
}

type GetOperation interface {
    WithLock(string) GetOperation
    OnEvent(func(WatchEvent)) GetOperation
    Run() ([]byte, error)
}

type DeleteOperation interface 
...

and implement zookeeper implementation for the Coordiatnor interface as below.

import (
    "github.com/paust-team/shapleq/coordinator"
    "github.com/samuel/go-zookeeper/zk"
)

type Coordinator struct {
    zkConn *zk.Conn
}

func (c Coordinator) Create(path string, value []byte) coordinator.CreateOperation {
    return NewZKCreateOperation(c.zkConn, path, value)
}
...

// operations for zk-impl
type SetOperation struct {
    conn           *zk.Conn
    path, lockPath string
    value          []byte
}

func (o SetOperation) WithLock(lockPath string) coordinator.SetOperation {
    o.lockPath = lockPath
    return o
}

func (o SetOperation) Run() error {
    if o.lockPath != "" {
        lock := zk.NewLock(o.conn, o.lockPath, zk.WorldACL(zk.PermAll))
        err := lock.Lock()
        if err != nil {
            err = pqerror.ZKLockFailError{LockPath: o.lockPath, ZKErrStr: err.Error()}
            return err
        }
        defer lock.Unlock()
    }
    var err error

    _, err = o.conn.Set(o.path, o.value, -1)

    if err != nil {
        return pqerror.ZKRequestError{ZKErrStr: err.Error()}
    }

    return nil
}
...

In a1aa4942a8306df1e5b7f458b485b5317477df5d, you can check how to utilize the Coordinator implementation on coordinator-helper package. And functional tests for zk-coordinator is added (3497f59d5308a847bec0ca8757f54328674c6069)

// helpers
type TopicManagingHelper struct {
    client coordinator.Coordinator
    logger *logger.QLogger
}

func (t *TopicManagingHelper) AddTopic(topicName string, topicFrame *common.FrameForTopic) error {...}
...

// define own Coordinator Wrapper (helper facade)
type ShapleQCoordinatorWrapper struct {
    *TopicManagingHelper
    *FragmentManagingHelper
         zkCoordinator *zk_impl.Coordinator
}

zkCoordinator := zk_impl.NewZKCoordinator(...)

coordinatorWrapper := ShapleQCoordinatorWrapper { 
  TopicManagingHelper:    NewTopicManagerHelper(zkCoordinator),
  FragmentManagingHelper: NewFragmentManagingHelper(zkCoordinator),
  zkCoordinator: zkCoordinator,
}
...
// and use
coordinatorWrapper.AddTopic(...)
coordinatorWrapper.DeleteTopicFragments(...)
elon0823 commented 2 years ago

I figured out the zookeeper watch methods (GetW, ChildrenW..) are one-time events. So on zk-implementation, it is required to re-register the watch event for each method. refer

elon0823 commented 2 years ago

apache curator style zk composition is good.

but, is this not over-design? say what happened its helper codes. how changes to take over convenience for a developer?

Did you mean a interface of the Coordinator is over-designed? Then i'm agree with that it is a little over-designed, but I think it is good to encapsulate and extend the zk dependency from all the Shaple projects. Because some Shaple projects needs to implement extra recipies (optimistic-lock, atomic-counter, recursive-watch, etc) which could be resued throughout the Shaple projects and I expected the Coordinator would support them like the apache curator do.

All the go-lang based Shaple project developers can cooperate with each other easily by supporting its coordinator helpers and reusing the helpers of other projects.