loadlj / blog

19 stars 6 forks source link

nsqd入口分析 #17

Open loadlj opened 6 years ago

loadlj commented 6 years ago

Main入口分析

nsqd的入口调用了go-svc的Run方法,这里的go-svc有点类似于watchdog监测用户signal的行为,可以很好的兼容Linux和Windows。svc_other里面声明的signalNotify是从signal.Notify这里拿到的,这个函数的主要作用是监听用户输入的signal。 Main还声明了一个tcpListener和一个httpListener,tcpListener是用来处理跟client的连接的。

    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
    })

tcpServer有一个handle方法,用来处理tcp的连接的。里面只会接受" V2"的protocol,然后对每个连接进行IOLoop。 IOLoop会对每个client先进行messagePump

messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan

messagePump主要做的事:发送心跳包给客户端,获取channel中的消息,发送给客户端。

case <-heartbeatChan:
            err = p.Send(client, frameTypeResponse, heartbeatBytes)
            if err != nil {
                goto exit
            }
case b := <-backendMsgChan:
case msg := <-memoryMsgChan:

然后在IOLoop中,有新的for循环去监听客户端的消息并进行处理。这里大概理一下,后面详细分析

回到Main中,后面新开了两个goroutine,一个queueScanLoop处理消息,一个lookupLoop处理和nsqlookupd的连接的。

n.waitGroup.Wrap(func() { n.queueScanLoop() })
n.waitGroup.Wrap(func() { n.lookupLoop() })

queueScanLoop

随机挑选若干个key,删除所有过期的,如果过期的key的占比大于25%,则继续循环删除 在nsqd里面的定义是如果一个channel还有work在处理则认为是"dirty",判断dirty的占比如果大于QueueScanDirtyPercent就会继续循环给workCh发送channel。

resizePool

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            // contract
            closeCh <- 1
            n.poolSize--
        } else {
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

resizePool是用来动态调整queueScanWorker的,idealPoolSize是当前的num(channels) / 4,会跟n.poolSize去做比较,这里表示的是queueScanWorker的数量。如果idealPoolSize要比当前的workpool小,再去新起一个queueScanWorker

queueScanWorker

在nsq中inFlight指的是正在投递但还没确认投递成功的消息,defferred指的是投递失败,等待重新投递的消息。 initPQ创建的字典和队列主要用于索引和存放这两类消息。其中两个字典使用消息ID作索引。