gh-liu / myNote

0 stars 0 forks source link

`go runtime` 中的`netpoll` #7

Open gh-liu opened 5 months ago

gh-liu commented 5 months ago

network poller

所有网络操作都以网络描述符 netFD 为中心,同时netFD 与底层 pollDesc 结构绑定, 当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 G 存储到这个 netFD 对应的 pollDesc 中,然后调用 gopark,将当前 G 设置为等待状态; 直到这个 netFD 上再次发生读写事件,调用 goready,将当前 G 设置为可运行状态;

// net/fd_posix.go
type netFD struct {
    pfd poll.FD

    // ...
}

// internal/poll/fd_unix.go
type FD struct {
    // ...

    // Platform dependent state of the file descriptor. // 真正的系统文件描述符
    SysFile

    // I/O poller.                                      // 底层事件的封装,netFD 通过它完成各种 I/O 相关操作
    pd pollDesc

    // ...
}

// internal/poll/fd_poll_runtime.go
// 
// 只是一个指针,但是可以通过 init 方法 中 调用的 runtime_pollOpen 函数
// 得到 指针指向 runtime.pollDesc
type pollDesc struct {
    runtimeCtx uintptr
}

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        return errnoErr(syscall.Errno(errno))
    }
    pd.runtimeCtx = ctx
    return nil
}

// runtime/netpoll.go
//
// 编译器将 poll_runtime_pollOpen 函数链接到 runtime_pollOpen 函数
//
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {}

// runtime/netpoll.go
// 
// Network poller descriptor.
//
// No heap pointers.
type pollDesc struct {
    // ...
    link  *pollDesc      // in pollcache, protected by pollcache.lock              // 用以实现链表
    fd    uintptr        // constant for pollDesc usage lifetime                   // 文件描述符

    // ...

    // rg, wg are accessed atomically and hold g pointers.                         // 取值分别可能是 pdReady、pdWait、等待 fd 可读或可写的 G 、nil
    // (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
    rg atomic.Uintptr                                                              // pdReady, pdWait, G waiting for read or pdNil
    wg atomic.Uintptr                                                              // pdReady, pdWait, G waiting for write or pdNil
    // ...
    rseq    uintptr   // protects from stale read timers                           // rseq/wseq 表示fd被重用或者计时器被重置
    rt      timer     // read deadline timer (set if rt.f != nil)                  // rt/wt 等待文件描述符的计时器
    rd      int64     // read deadline (a nanotime in the future, -1 when expired) // rd/wd 等待fd可读或可写的截至日期
    wseq    uintptr   // protects from stale write timers                          // 上面提到
    wt      timer     // write deadline timer                                      // 上面提到
    wd      int64     // write deadline (a nanotime in the future, -1 when expired)// 上面提到
    // ...
}
// pollDesc 缓存:包含一个用于保护轮询数据的互斥锁和链表头
// 运行时包中的全局变量 pollcache 
//
// runtime/netpoll.go
type pollCache struct {
    lock  mutex
    first *pollDesc
    // PollDesc objects must be type-stable,
    // because we can get ready notification from epoll/kqueue
    // after the descriptor is closed/reused.
    // Stale notifications are detected using seq variable,
    // seq is incremented when deadlines are changed or descriptor is reused.
}

// 初次调用,批量初始化 pollDesc
func (c *pollCache) alloc() *pollDesc {}
// 回收 pollDesc,结构体被重复利用时才会由 runtime.poll_runtime_pollOpen 函数重置
func (c *pollCache) free(pd *pollDesc) {}

实现

网络轮询器,与平台无关,具体实现需要定义如下函数:

  1. netpollinit(): 初始化网络轮询器,仅调用一次
  2. netpollopen(fd uintptr, pd *pollDesc) int32: 为文件描述符 fd 启用边缘触发通知,pd 参数用于在 fd 就绪时传递给 netpollready,返回一个 errno 值
  3. netpollclose(fd uintptr) int32: 禁用文件描述符 fd 的通知,返回一个 errno 值
  4. netpoll(delta int64) (gList, int32): 进行网络轮询,如果 delta < 0,则无限期阻塞,如果 delta == 0,则不阻塞,如果 delta > 0,则阻塞至多 delta 纳秒,返回通过调用 netpollready 构建的 goroutine 列表,以及要添加到 netpollWaiters 的 delta。这永远不会返回一个非空列表和一个非零的 delta。
  5. netpollBreak(): 唤醒网络轮询器,假设它在 netpoll 中被阻塞
  6. netpollIsPollDescriptor(fd uintptr) bool: 报告 fd 是否是轮询器使用的文件描述符

epoll 实现

runtime/netpoll_epoll.go

  1. 网络轮询器的初始化
  2. 向网络轮询器加入待监控的任务
  3. 从网络轮询器获取触发的事件
// 1. 网络轮询器的`初始化`
// 
// 调用链 poll.runtime_pollServerInit -> runtime.poll_runtime_pollServerInit -> runtime.netpollGenericInit -> runtime.netpollinit
func netpollinit() {
    var errno uintptr                                         // 系统调用返回的错误码
    epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC) // 创建一个 epoll 示例 epfd,作为 runtime 的唯一 event-loop 使用
    // ...
    r, w, errpipe := nonblockingPipe()                        // 创建一个非阻塞管道,用于和 epoll 实例通信
    // ...
    ev := syscall.EpollEvent{
        Events: syscall.EPOLLIN,                              // 表示读事件
    }
    *(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd  // 将netpollBreakRd 通知信号量,封装成EpollEvent,注册进 epoll 实例,从而监听读事件
    errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
    // ...
    netpollBreakRd = uintptr(r)                               // 保存管道的读文件描述符
    netpollBreakWr = uintptr(w)                               // 保存管道的写文件描述符
}

// 2. 向网络轮询器`加入待监控`的任务
//
// 调用链 poll.runtime_pollOpen -> runtime.poll_runtime_pollOpen -> runtime.netpollopen
// 调用 poll_runtime_pollOpen(fd), 注册 fd 到 epoll 实例,pd 从 pollcache 中获取
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
    var ev syscall.EpollEvent
    ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET // 可读/可写事件、对端断开连接事件、边缘触发模式
    tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())                          // 打包 pd 数据
    *(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
    return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)                  // 将 fd 加入 epoll 进行监听
}

func netpollclose(fd uintptr) uintptr {
    var ev syscall.EpollEvent
    return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int32(fd), &ev)                  // 将文件描述符 fd 从 epoll 实例中移除监听
}

// 3. 从网络轮询器`获取触发`的事件
// 
// 1. 根据参数 delay,设置对应的调用 epollwait 的 timeout 值
// 2. 调用 epollwait 等待发生了可读/可写事件的 fd
// 3. 循环 epollwait 返回的事件列表,处理对应的事件类型,组装可运行的 goroutine 链表并返回。
// 随后调用 `runtime.injectglist(glist *gList)` 加入到全局队列或 P 本地队列
// 
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) (gList, int32) { // 参数 delay 表等待的时间,返回一个 gList,也就是可运行的G列表 
    // ...
    var waitms int32 // 根据 delay 设置`epollwait` 等待的时间:无限制阻塞、不阻塞只轮询、阻塞1ms、阻塞 delay ns、阻塞约11.5天
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        // An arbitrary cap on how long to wait for a timer.
        // 1e9 ms == ~11.5 days.
        waitms = 1e9
    }

    var events [128]syscall.EpollEvent  // 声明长度为 128 的 EpollEvent 数组,用于储存 epoll 事件
retry:
    n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
    if errno != 0 {                     // 是否发生了错误
        if errno != _EINTR {
            // ...
        }
        // If a timed sleep was interrupted, just return to
        // recalculate how long we should sleep now.
        if waitms > 0 {
            return gList{}, 0
        }
        goto retry                      // 进行重试
    }

    var toRun gList
    delta := int32(0)
    for i := int32(0); i < n; i++ { // 遍历 epoll 事件数组
        ev := events[i]
        if ev.Events == 0 { // 如果事件为 0,表示没有发生事件,跳过当前循环
            continue
        }

        if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd { // 是否是唤醒网络轮询的事件
            if ev.Events != syscall.EPOLLIN { // 如果事件类型不是 EPOLLIN,则打印错误信息并抛出异常
                // ...
            }
            if delay != 0 { // 不等于 0,表示需要阻塞
                // netpollBreak could be picked up by a
                // nonblocking poll. Only read the byte
                // if blocking.
                var tmp [16]byte
                read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) // 从 netpollBreakRd 读取一个字节,清除唤醒标志
                netpollWakeSig.Store(0)
            }
            continue
        }

        var mode int32                                                                             // 表示事件的模式
        if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 { // 发生读取相关的事件
            mode += 'r'
        }
        if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {                   // 发生写入相关的事件
            mode += 'w'
        }
        if mode != 0 {                                                                             // 模式不为 0
            tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))                                      // 获取与事件相关联的 pollDesc 结构体(在 netpollopen 时储存的)
            pd := (*pollDesc)(tp.pointer())
            tag := tp.tag()
            if pd.fdseq.Load() == tag {
                pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
                delta += netpollready(&toRun, pd, mode)                                            // 处理准备好的网络连接(不同的模式:读/写)
            }
        }
    }
    return toRun, delta
}

// 往通信管道里写入信号去唤醒 epollwait
//
// netpollBreak interrupts an epollwait.
func netpollBreak() {
    // Failing to cas indicates there is an in-flight wakeup, so we're done here. // 避免重复的唤醒信号
    if !netpollWakeSig.CompareAndSwap(0, 1) {
        return
    }

    for {
        // ...
    }
}

无关实现

runtime/netpoll.go

// 调用链 netpoll -> netpollready
// 调用 netpollunblock,返回就绪 pd 对应的 G
//
// netpollready is called by the platform-specific netpoll function.
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This returns a delta to apply to netpollWaiters.
//
// This may run while the world is stopped, so write barriers are not allowed.
//
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
    // ...
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true, &delta)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true, &delta)
    }
    // ...
}

// 根据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的 G
//
// netpollunblock moves either pd.rg (if mode == 'r') or
// pd.wg (if mode == 'w') into the pdReady state.
// This returns any goroutine blocked on pd.{rg,wg}.
// It adds any adjustment to netpollWaiters to *delta;
// this adjustment should be applied after the goroutine has
// been marked ready.
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {}
// 在文件描述符上执行读写操作时,如果文件描述符不可读或者不可写
// 调用链 poll.runtime_pollWait -> runtime.poll_runtime_pollWait -> netpollblock
//
// returns true if IO is ready, or false if timed out or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg 
    if mode == 'w' { // 根据模式获取到 G
        gpp = &pd.wg
    }

    // set the gpp semaphore to pdWait            // 循环等待 IO ready 或者 IO wait
    for {
        // Consume notification if already ready. // IO ready
        if gpp.CompareAndSwap(pdReady, pdNil) {
            return true                           // 直接返回并执行相关操作
        }
        if gpp.CompareAndSwap(pdNil, pdWait) {    // 没有 IO 事件发生
            break
        }

        // Double check that this isn't corrupt; otherwise we'd loop
        // forever.
        if v := gpp.Load(); v != pdReady && v != pdNil {
            throw("runtime: double wait")
        }
    }

    // need to recheck error states after setting gpp to pdWait
    // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    // do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
    if waitio || netpollcheckerr(pd, mode) == pollNoError {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5) // 将 G 设为等待状态,通过 netpollblockcommit 将 G 存入 pd 中
    }
    // be careful to not lose concurrent pdReady notification
    old := gpp.Swap(pdNil)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}
gh-liu commented 5 months ago

usage: 网络I/O、文件I/O

type pollDesc struct {
    runtimeCtx uintptr
}

var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)             // 在初始化 pollDesc 时,初始化网络轮询器(通过sync.Once仅执行一次)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) // 加入轮询事件
    if errno != 0 {
        return errnoErr(syscall.Errno(errno))
    }
    pd.runtimeCtx = ctx
    return nil
}

net/fd_posix.go

// Network file descriptor.
type netFD struct {
    pfd poll.FD

    // ...
}

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
    ret := &netFD{
        pfd: poll.FD{
            Sysfd:         sysfd,
            // ...
        },
        // ...
    }
    return ret, nil
}

func (fd *netFD) init() error {
    return fd.pfd.Init(fd.net, true)                      // 初始化 pfd,也就是 poll.FD
}

func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, ret error) {
        // ...
        if err := fd.pfd.Init(fd.net, true); err != nil { // 初始化 pfd,也就是 poll.FD
            return nil, err
        }
        // ...
}

os/types.go

// File represents an open file descriptor.
type File struct {
    *file // os specific
}

// file is the real representation of *File.
// The extra level of indirection ensures that no clients of os
// can overwrite this data, which could cause the finalizer
// to close the wrong file descriptor.
type file struct {
    pfd         poll.FD
    // ...
}

func newFile(fd int, name string, kind newFileKind) *File {
    f := &File{&file{
        pfd: poll.FD{
            Sysfd:         fd,
            // ...
        },
        // ...
    }}
    // ...
    if pollErr := f.pfd.Init("file", pollable); pollErr != nil && clearNonBlock {} // 初始化 pfd,也就是 poll.FD
    // ...
    return f
}

usage: 计时器

// doaddtimer adds t to the current P's heap.
// The caller must have locked the timers for pp.
func doaddtimer(pp *p, t *timer) {
    // Timers rely on the network poller, so make sure the poller
    // has started.
    if netpollInited.Load() == 0 {
        netpollGenericInit()  // 初始化网络轮询器
    }
    // ...
}

网络轮询器负责处理文件和网络 I/O 的截止日期

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
    // ...
    // I/O poller.
    pd pollDesc
    // ...
}

// SetDeadline sets the read and write deadlines associated with fd.
func (fd *FD) SetDeadline(t time.Time) error {
    return setDeadlineImpl(fd, t, 'r'+'w')
}

// SetReadDeadline sets the read deadline associated with fd.
func (fd *FD) SetReadDeadline(t time.Time) error {
    return setDeadlineImpl(fd, t, 'r')
}

// SetWriteDeadline sets the write deadline associated with fd.
func (fd *FD) SetWriteDeadline(t time.Time) error {
    return setDeadlineImpl(fd, t, 'w')
}

func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
    var d int64
    // ...
    runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
    return nil
}

// 链接到 poll_runtime_pollSetDeadline 
func runtime_pollSetDeadline(ctx uintptr, d int64, mode int){}

// 设置超时时间
// 1. 先使用截止日期计算出过期的时间点
// 2. 根据 pd 的状态做出不同处理
// 2.1 计时器没有设置执行函数,则设置函数并调用 resettimer 重置计时器
// 2.2 截至日期发生改变
// 2.2.1 改变大于0,则调用 modtimer 修改计时器
// 2.2.2 改变小于0,则调用 deltimer 删除计时器
// 3. 最后会重新检查截止日期,决定是否唤醒 G
// 
//go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
    // ...
    rd0, wd0 := pd.rd, pd.wd
    combo0 := rd0 > 0 && rd0 == wd0
    if d > 0 {
        d += nanotime()
        if d <= 0 {
            // If the user has a deadline in the future, but the delay calculation
            // overflows, then set the deadline to the maximum possible value.
            d = 1<<63 - 1
        }
    }
    if mode == 'r' || mode == 'r'+'w' {
        pd.rd = d
    }
    if mode == 'w' || mode == 'r'+'w' {
        pd.wd = d
    }
    pd.publishInfo()
    combo := pd.rd > 0 && pd.rd == pd.wd
    rtf := netpollReadDeadline
    if combo {
        rtf = netpollDeadline
    }
    if pd.rt.f == nil {
        if pd.rd > 0 {
            pd.rt.f = rtf
            // Copy current seq into the timer arg.
            // Timer func will check the seq against current descriptor seq,
            // if they differ the descriptor was reused or timers were reset.
            pd.rt.arg = pd.makeArg()
            pd.rt.seq = pd.rseq
            resettimer(&pd.rt, pd.rd)
        }
    } else if pd.rd != rd0 || combo != combo0 {
        pd.rseq++ // invalidate current timers
        if pd.rd > 0 {
            modtimer(&pd.rt, pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
        } else {
            deltimer(&pd.rt)
            pd.rt.f = nil
        }
    }
    if pd.wt.f == nil {
        if pd.wd > 0 && !combo {
            pd.wt.f = netpollWriteDeadline
            pd.wt.arg = pd.makeArg()
            pd.wt.seq = pd.wseq
            resettimer(&pd.wt, pd.wd)
        }
    } else if pd.wd != wd0 || combo != combo0 {
        pd.wseq++ // invalidate current timers
        if pd.wd > 0 && !combo {
            modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
        } else {
            deltimer(&pd.wt)
            pd.wt.f = nil
        }
    }
    // If we set the new deadline in the past, unblock currently pending IO if any.
    // Note that pd.publishInfo has already been called, above, immediately after modifying rd and wd.
    delta := int32(0)
    var rg, wg *g
    if pd.rd < 0 {
        rg = netpollunblock(pd, 'r', false, &delta)
    }
    if pd.wd < 0 {
        wg = netpollunblock(pd, 'w', false, &delta)
    }
    unlock(&pd.lock)
    if rg != nil {
        netpollgoready(rg, 3)
    }
    if wg != nil {
        netpollgoready(wg, 3)
    }
    netpollAdjustWaiters(delta)
}