loadlj / blog

19 stars 6 forks source link

Topic及Channle #20

Open loadlj opened 6 years ago

loadlj commented 6 years ago

nsq Topic及Channle

Topic

type Topic struct {
    // 64bit atomic vars need to be first for proper alignment on 32bit platforms
    messageCount uint64

    sync.RWMutex

    name              string
    channelMap        map[string]*Channel // topic的channels
    backend           BackendQueue // 持久化
    memoryMsgChan     chan *Message // 消息管道
    exitChan          chan int
    channelUpdateChan chan int
    waitGroup         util.WaitGroupWrapper
    exitFlag          int32
    idFactory         *guidFactory

    ephemeral      bool
    deleteCallback func(*Topic)
    deleter        sync.Once

    paused    int32
    pauseChan chan bool

    ctx *context
}

Topic主要有三个goroutine:router、messagePump、DiskQueue router只要是用来接收http接口的topic消息的,调用PutMessage将msg放入memoryMsgChan和DiskQueue中。 messagePump主要是将msg分发到该topic的channelMap中去。 DiskQueue主要是将内存中存放不下的msg,存入磁盘中去,稍后再处理

Channel

type Channel struct {
    // 64bit atomic vars need to be first for proper alignment on 32bit platforms
    requeueCount uint64
    messageCount uint64
    timeoutCount uint64

    sync.RWMutex

    topicName string
    name      string
    ctx       *context

    backend BackendQueue // 磁盘持久化

    memoryMsgChan chan *Message //  消息管道
    exitFlag      int32
    exitMutex     sync.RWMutex

    // state tracking
    clients        map[int64]Consumer // 所有订阅改channel的消费者
    paused         int32
    ephemeral      bool
    deleteCallback func(*Channel)
    deleter        sync.Once

    // Stats tracking
    e2eProcessingLatencyStream *quantile.Quantile

    // TODO: these can be DRYd up
    deferredMessages map[MessageID]*pqueue.Item
    deferredPQ       pqueue.PriorityQueue
    deferredMutex    sync.Mutex
    inFlightMessages map[MessageID]*Message
    inFlightPQ       inFlightPqueue
    inFlightMutex    sync.Mutex
}

Channel比Topic更为复杂,设计的结构是single input and single output go-chan 设计的原理是input和output走的都是同一条Channel,不管有多少个client,一条msg确认只会被一个client消费掉。 另外在还会去维护一个inFlightPQ和deferredPQ,里面是按时间排序的,后续会有一个worker pool去维护这个两个queue。 NSQD在给client发送消息之后,会将该消息添加到该channel的一个叫inFlightPQ的优先级队列中。该优先级队列的个底层结构是数组,然后基于数组实现的小根堆。而权重则是发送消息时规定的timeOut的时长。 当NSQD在收到client发送过来的FIN确认消息之后,就会从inFlightPQ移除相应的消息。

nsqd中worker pool

nsqd中的worker pool模型,每隔一段时间会去动态的调整worker的数量,这部分代码是在queueScanLoop中实现的。

workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
closeCh := make(chan int)

workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

channels := n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)

声明workCh和responseCh,是用来决定是否需要继续去work的,closCh是用来减少worker的。然后进行第一次resizePool,resizepool是用来调整queueScanWorker goroutines数量的,同时需要确保 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            // contract
            closeCh <- 1
            n.poolSize--
        } else {
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

第一次resizePool后进入for循环开始处理worker,先看3个case

select {
// wait until ticker
case <-workTicker.C:
    if len(channels) == 0 {
        // continue means do select operation
        continue
    }
case <-refreshTicker.C:
    // 这个是用来resize用的
    channels = n.channels()
    n.resizePool(len(channels), workCh, responseCh, closeCh)
    continue
case <-n.exitChan:
    goto exit
}

workTicker先判断channel是否为空,如果不为空的话则进入后面的loop,refreshTicker用来实时调整worker goroutine,exitChan用来推出程序。

loop:
    for _, i := range util.UniqRands(num, len(channels)) {
        // send work
        workCh <- channels[i]
    }

    numDirty := 0
    for i := 0; i < num; i++ {
        // response chan is true
        if <-responseCh {
            numDirty++
        }
    }

    if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
        goto loop
    }
}

loop里面的代码借鉴了redis的被动刷新过期key的算法,当超过一定比例的worker有工作的时候,则一直循环下去,直到worker工作部分的比例低于指定比例。这里会给workCh发送channel,用responseCh的返回值来判断一个worker是否有工作要做。 workCh的处理部分主要在queueScanWorker中。

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        // receive work chan
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            // process queue
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

收到workCh后会去根据时间戳在InFlightQueue和DeferredQueue里面找到过期的msg,然后将消息重放。若收到closeCh,则关闭该goroutine。