6174 / 6174.github.io

keep calm & carry on
https://6174.github.io
19 stars 0 forks source link

syncthing 源码阅读:events pubsub 模型 #11

Open 6174 opened 7 years ago

6174 commented 7 years ago

syncthing/lib/events/events.go

syncthing 事件管理

入口函数部分看到了 apiSub 和 diskSub

apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents&^events.LocalChangeDetected), 1000)
diskSub := events.NewBufferedSubscription(events.Default.Subscribe(events.LocalChangeDetected), 1000)

syncthing 的 pubsub 模块是通过 events 包来管理的,下面咱来完整的看看 events 的设计

events 包结构

EventType

定义 event 的类型常量和对应的字符串

type EventType int

const (
    Ping EventType = 1 << iota
    Starting
    ....

    AllEvents = (1 << iota) - 1
)

这里通过 iota 表达式技巧来定义常量,对应

 Ping       00000000
 Starting   00000010
 ...
 AllEvents  0111...11

常量采用类位运算的方式

1 << iota 表示向左移动 iota 位

mask & eventType != 0 来判断 mask 时候包好对应的事件类型

mask = Starting : Starting 事件 mask = Starting & Ping : Staring 和 Ping 事件 mask = AllEvents&^Starting : Starting 除外的其他事件 (&^ 表示后者先取反再执行 & 运算)

EventType String 方法返回对应常量的字符串 slice

Event

每次时间对应一个 Event 实例 ,结构

type Event struct {
    // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
    // 一个订阅者创建后,后面所有订阅者订阅的事件都会传递给订阅者-对应事件序列,SubscriptionID 是这个序列中事件对应的 ID
    SubscriptionID int `json:"id"`

    // 事件全局递增 ID
    GlobalID int         `json:"globalID"`

    // 事件创建时间
    Time     time.Time   `json:"time"`

    // 事件类型
    Type     EventType   `json:"type"`

    // 时间附带数据 (interface{} 为万能类型)
    Data     interface{} `json:"data"`
}

Logger

Logger 管理了事件的创建和订阅,结构:

type Logger struct {

    // 订阅者数组
    subs                []*Subscription

    // 用于产生下次事件的 e.SubscriptionId ,对应每一个 sub 一个 int 值
    nextSubscriptionIDs []int

    // 事件的全局 ID 管理
    nextGlobalID        int

    // Logger 可能会被多个 goroutine 调用,所以必须要设置互斥
    mutex               sync.Mutex
}

// Log 创建事件
func (l *Logger) Log(t EventType, data interface{}){}

// Subscribe 方法创建对应事件的订阅者
func (l *Logger) Subscribe(mask EventType) *Subscription {}

// 取消事件的订阅
func (l *Logger) Unsubscribe(s *Subscription) {}

Default

对外暴露的默认 Logger ,调用 Default.Log 创建事件,Default.Subscribe 创建订阅者

Subscription

订阅者

const BufferSize = 64
type Subscription struct {
    // 订阅的事件 mask
    mask    EventType
    // 事件 channel , 对应一个有缓冲区的通道,缓冲区大小为 BufferSize
    events  chan Event
    // 接收事件设置一个 timeout ,在 timeout 之内不会重复接收
    timeout *time.Timer
}

// Poll returns an event from the subscription or an error if the poll times
// out of the event channel is closed. Poll should not be called concurrently
// from multiple goroutines for a single subscription.
// Poll 方法将本 goroutine 停住,等待一个事件产生,或者超时事件产生
func (s *Subscription) Poll(timeout time.Duration) (Event, error) {}

// 返回事件通道
func (s *Subscription) C() <-chan Event {}

BufferedSubscription

相当于一个订阅者的事件缓冲区(感觉名字应该叫 SubscriptionEventBuffer)

type bufferedSubscription struct {
    // 订阅者
    sub  *Subscription
    // 事件缓冲数组
    buf  []Event
    // 缓冲区是有大小限制的,next 每次获得一个事件过后要放在缓冲区的什么位置
    next int
    // 当前的 event 对应的 SubscriptionID
    cur  int // Current SubscriptionID
    // 多个 goroutine 场景要使用互斥和信号量来
    mut  sync.Mutex
    cond *stdsync.Cond
}

// 创建缓冲区并启动一个 goroutine 运行 pollingLoop
func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
    bs := &bufferedSubscription{
        sub: s,
        buf: make([]Event, size),
        mut: sync.NewMutex(),
    }
    bs.cond = stdsync.NewCond(bs.mut)
    go bs.pollingLoop()
    return bs
}

// pollingLoop 方法会启动一个循环,不断的调用 sub.Poll 方法获取事件
func (s *bufferedSubscription) pollingLoop() {}

// Since 方法会获取 id 后缓冲区内发生的所有事件
func (s *bufferedSubscription) Since(id int, into []Event) []Event {
    s.mut.Lock()
    defer s.mut.Unlock()

    // 无线循环直到等到有发生在 id 后的事件产生
    // Q: 为什么没有非要一直等呢,万一一直没有事件发生不是卡死了么
    for id >= s.cur {
        s.cond.Wait()
    }

    for i := s.next; i < len(s.buf); i++ {
        if s.buf[i].SubscriptionID > id {
            into = append(into, s.buf[i])
        }
    }
    for i := 0; i < s.next; i++ {
        if s.buf[i].SubscriptionID > id {
            into = append(into, s.buf[i])
        }
    }

    return into
}

完整流程

events 模块中最重要的就是 Default 和 BufferedSubscription 了,通过 go 插件可以看到调用方:

events_default_usage
  1. sub = events.Default.Subscribe 创建订阅者
  2. bsub = events.NewBufferedSubscription 创建一个事件订阅缓冲区
  3. events.Default.Log 产生事件
  4. bsub.Since 方法获取缓冲区内的事件列表
amozz commented 7 years ago

请问这是用的什么go插件?

6174 commented 7 years ago

@amozz https://github.com/go-lang-plugin-org/go-lang-idea-plugin