dushaoshuai / dushaoshuai.github.io

https://www.shuai.host
0 stars 0 forks source link

Go: epoll 与网络轮询器 #116

Open dushaoshuai opened 1 year ago

dushaoshuai commented 1 year ago

Go 的网络轮询器设计非常巧妙:它不单单说是简单地调用各个平台上的 I/O 多路复用机制,而是将 epoll、kqueue 等 I/O 事件通知机制紧密地整合到 Go 的运行时中,极大地增强了 I/O 操作的并发处理能力。文件 I/O、网络 I/O 和计时器都是由网络轮询器管理的。

一些闲话 考虑 goroutine 进行 I/O 的几种可能性: 阻塞 I/O:M 因为系统调用被阻塞,调度器将 P 分离出来,P 可能结合其他 M 去执行其他 G。使用这种模型,可能会有大量的 M 被阻塞,切换 M 的开销也是比较大的。 非阻塞 I/O:G 执行的 I/O 操作立即完成,但是可能返回 `EAGAIN` 的错误,该错误表明此次 I/O 将会阻塞。G 可以不停地重试 I/O 操作,直到成功。使用这种模型,G 可以选择在重试的间隙进行其他操作,提高 CPU 利用率。 I/O 事件通知机制:G 进行非阻塞 I/O,如果返回 `EAGAIN` 错误,则让出 P 和 M,等待操作的文件描述符就绪后再被调度执行。

(本文书写时使用的 Go 版本是 go1.20)

(关于 I/O 事件通知机制,本文只关注 Linux 平台上的 epoll)

epoll

epoll 是 Linux 内核的一种可伸缩的 I/O 事件通知机制。epoll 的作用是监听多个文件描述符,查看它们是否可进行 I/O。epoll 内部使用红黑树实现,在监听的文件描述符数量较多时,epoll 也有较好的性能表现,从用户空间的视角来说,epoll 操作的复杂度是 O(1),好于 select(O(n))和 poll(O(n))。

3 个系统调用

(查看帮助文档,如:man 2 epoll_create1

epoll_create1() 创建新的 epoll 实例并返回其文件描述符。

epoll_ctl() 注册要监听的文件描述符及要监听的事件。

epoll_wait() 阻塞或者非阻塞地等待 I/O 事件,阻塞或者非阻塞取决于 timeout 参数。

struct epoll_eventman 3type epoll_event):

重点提一下结构体 epoll_event,它是 epoll_ctl()epoll_wait() 中 event(s) 参数的类型,我们需要在调用 epoll_ctl() 时把它交给内核保存,在调用 epoll_wait() 时内核又会把它返回给我们,相当于是在一个文件描述符和一个 epoll_event 结构体之间建立了联系。

struct epoll_event {
    uint32_t      events;  /* Epoll events,事件类型 */
    epoll_data_t  data;    /* User data variable,我们想要和监听的文件描述符关联起来的数据 */
};

union epoll_data { /* 可以关联的数据类型,其实就是数字 */
    void     *ptr; /* 指向任意数据的指针 */
    int       fd;  /* int */
    uint32_t  u32; /* 32 位无符号整数 */
    uint64_t  u64; /* 64 位无符号整数 */
};

typedef union epoll_data  epoll_data_t;

epoll_event 中,用户可以保存自己需要的数据,这个数据一般都会包括要监听的文件描述符在内。那么 Go 在整合 epoll 时,给每个文件描述符关联的数据是什么呢?

事件触发模式

edge-triggered(ET,EPOLLET):只有当监听的文件描述符发生变化(变得可读或变得可写)时才触发事件。

level-triggered (LT):只要监听的文件描述符是可读或者可写的,就会触发事件。默认。

网络轮询器

Go 运行时中并没有定义具体的网络轮询器,而是由各方配合调度轮询。

常量/变量和数据结构

epfd

var epfd int32 = -1 是全局唯一 epoll 实例文件描述符。在 netpollinit() 中初始化。

netpollBreakRdnetpollBreakWrnetpollWakeSig

var (
    netpollBreakRd, netpollBreakWr uintptr // for netpollBreak

    netpollWakeSig atomic.Uint32 // used to avoid duplicate calls of netpollBreak
)

这几个变量用于打断网络轮询。网络轮询器管理计时器,当调度器发现某个计时器要早于网络轮询器等待的时间时,调度器会调用 netpollBreak() 打断当前阻塞的轮询。

netpollBreakRdnetpollBreakWrnetpollinit() 中初始化,它们是一个非阻塞 pipe(man 7 pipe,man 2 pipe2)的读端和写端(两个文件描述符)。epfd 会监听 netpollBreakRd,当需要打断网络轮询时,只需向 netpollBreakWr 中写入数据,就可唤醒因调用 epoll_wait() 而陷入阻塞的 goroutine。

netpollWakeSig 用于避免短时间内重复打断网络轮询。(TODO 再看看)

pollDesc

pollDesc 封装了操作系统文件描述符,为 Go 运行时提供必要的支持。pollDesc 为因读写当前文件描述符而被阻塞的 goroutine 提供了 park 的场所。事实上,在调用 epoll_ctl() 时,运行时会给每个操作系统文件描述符关联一个 *pollDesc

(这里删掉了和 timer 相关的字段,本文不打算过多涉及 timer)

// Network poller descriptor.
//
// No heap pointers.
type pollDesc struct {
    _    sys.NotInHeap
    link *pollDesc // in pollcache, protected by pollcache.lock
    fd   uintptr   // constant for pollDesc usage lifetime

    // ...

    // rg, wg are accessed atomically and hold g pointers.
    // (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
    rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
    wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil

    lock    mutex // protects the following fields
    closing bool
    // ...
}

pollCache

pollCachepollDesc 的空闲链表,由 pollDesc.link 字段链接起来。

type pollCache struct {
    lock  mutex
    first *pollDesc
}

需要注意 pollDesc 是通过 persistentalloc() 分配的持久化数据。

EpollEvent

定义在 runtime/internal/syscall/defs_linux_amd64.go 中。是上文所提到的 epoll_event 在 Go 语言中的表示。

type EpollEvent struct {
    Events uint32
    Data   [8]byte // unaligned uintptr
}

操作 epoll

直接操作 epoll 的代码都在 runtime/netpoll_epoll.go 中。

netpollinit()

通过系统调用 epoll_create1() 创建全局 epoll 实例 epfd

通过系统调用 pipe2() 创建非阻塞 pipe,其读写端分别为 netpollBreakRdnetpollBreakWr

通过系统调用 epoll_ctl()netpollBreakRd 注册为 epfd 要监听的文件描述符,监听的事件是 EPOLLIN,即监听可读事件。注意:这里没有指明触发模式,因此是默认的 LT 模式,即 pipe 里有数据就会触发。

func netpollinit() {
    var errno uintptr
    epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
    // error handling ...
    r, w, errpipe := nonblockingPipe()
    // error handling ...
    ev := syscall.EpollEvent{
        Events: syscall.EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
    errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
    // error handling ...
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

netpollopen()

通过系统调用 epoll_ctl(),使 epfd 监听 fdfd 是操作系统文件描述符,监听的事件为 EPOLLINEPOLLOUT(可写)、EPOLLET(触发模式为 EF)、EPOLLRDHUPEPOLLHUP(文件描述符被关闭,其中 EPOLLHUP 是默认的,不用特别指定)。

正如前文所提到的,给 fd 所关联的数据是相应的 *pollDesc

func netpollopen(fd uintptr, pd *pollDesc) uintptr {
    var ev syscall.EpollEvent
    ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.Data)) = pd
    return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

netpollclose()

通过系统调用 epoll_ctl(),移除 epfdfd 的监听。

func netpollclose(fd uintptr) uintptr {
    var ev syscall.EpollEvent
    return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int32(fd), &ev)
}

netpollBreak()

唤醒(打断)正在睡眠(被阻塞)的网络轮询。通过 netpollWakeSig 在一定程度上保证不会有短时间内的重复唤醒。唤醒的方式是向 netpollBreakWr 中写入 (最多)1 字节的数据。

func netpollBreak() {
    // Failing to cas indicates there is an in-flight wakeup, so we're done here.
    if !netpollWakeSig.CompareAndSwap(0, 1) {
        return
    }

    for {
        var b byte
    n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
    if n == 1 { // 写入成功
        break
    }
    if n == -_EINTR { // 写操作被中断,继续重试
        continue
    }
    if n == -_EAGAIN { // 写操作将被阻塞,说明 pipe 满了,这些已有的数据就可以唤醒网络轮询,因此不需要再向 pipe 中写入数据了(虽然我感觉在这种场景下,pipe 根本不可能写满吧)
        return
    }
    println("runtime: netpollBreak write failed with", -n)
    throw("runtime: netpollBreak write failed")
    }
}

netpoll()

netpoll() 检查是否有就绪的 I/O,返回因为这些 I/O 被阻塞的 goroutine。

通过系统调用 epoll_wait() 等待 I/O 事件,是否是阻塞等待取决于 delay 参数:

如果这次网络轮询是被唤醒的,尝试从 netpollBreakRd 中最多读取 16 字节的数据,这里是基于这样一个假设^1:不会有超过 16 个 goroutine 同时想要唤醒网络轮询,因此最多读 16 字节就可以把 pipe 清空。读取 pipe 中所有数据是因为,这一次唤醒就可以满足此前所有尝试唤醒网络轮询的 goroutine。

对于每一个 I/O 事件,调用 netpollready() 将阻塞的 goroutine 释放出来,最终返回一个 goroutine 的链表 gList注意netpollready() 并不修改被阻塞的 goroutine 本身的状态,而是由 netpoll() 的调用方负责将 goroutine 的状态由 _Gwaiting 切换为 _Grunnable,并加入某些运行队列。

func netpoll(delay int64) gList {
    // ...
    var waitms int32
    // ...
    var events [128]syscall.EpollEvent // 每次最多处理 128 个 I/O 事件
retry:
    n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
    if errno != 0 {
        if errno != _EINTR {
            // error handling
        }
        // If a timed sleep was interrupted, just return to
        // recalculate how long we should sleep now.
        if waitms > 0 {
            return gList{}
        }
        goto retry
    }
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := events[i]
        if ev.Events == 0 {
            continue
        }

        if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd { // 这次网络轮询是被唤醒的
            if ev.Events != syscall.EPOLLIN {
                // error handling ...
            }
            if delay != 0 {
                // netpollBreak could be picked up by a
                // nonblocking poll. Only read the byte
                // if blocking.
                var tmp [16]byte // 尝试读取 pipe 中所有数据
                read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
                netpollWakeSig.Store(0) // 后面的 goroutine 可以调用 netpollBreak() 了
            }
            continue
        }

        var mode int32
        if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.Data))
            pd.setEventErr(ev.Events == syscall.EPOLLERR)
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}

pollDesc 如何支持网络 I/O 和文件 I/O

internal/poll.pollDescruntimeCtx 字段是一个指向 runtime.pollDesc 的指针,在 runtime.pollDesc 的基础上又封装了一些方法。

internal/poll.FD 是在 pollDesc 基础上封装的更高一层抽象的文件描述符。net 包使用 internal/poll.FD 抽象出网络连接,os 包使用 internal/poll.FD 抽象出操作系统文件。

net.TCPConn.Write()runtime.gopark()

net.TCPConn 中嵌入了 net.conn

// package net

type TCPConn struct {
    conn
}

net.TCPConn 没有 Write 方法,因此调用的是 (*net.conn).Write()

// package net

type conn struct {
    fd *netFD
}

func (c *conn) Write(b []byte) (int, error) {
    // ...
    n, err := c.fd.Write(b)
    // error handling
}

进而调用 (*net.netFD).Write()

// package net

type netFD struct {
    pfd poll.FD
    // ...
}

func (fd *netFD) Write(p []byte) (nn int, err error) {
    nn, err = fd.pfd.Write(p)
    // ...
}

进而调用 (*internal/poll.FD).Write()。在这个方法中,如果写操作会阻塞,可能会调度当前 goroutine park 在要写的文件描述符上,等待被唤醒。

// package poll

type FD struct {
    // ...
    Sysfd int
    // ...
    pd pollDesc
    // ...
}

func (fd *FD) Write(p []byte) (int, error) {
    if err := fd.writeLock(); err != nil {
        return 0, err
    }
    defer fd.writeUnlock()
    if err := fd.pd.prepareWrite(fd.isFile); err != nil {
        return 0, err
    }
    var nn int
    for {
        max := len(p)
        if fd.IsStream && max-nn > maxRW {
            max = nn + maxRW
        }
        n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
        if n > 0 {
            nn += n
        }
        if nn == len(p) {
            return nn, err
        }
        if err == syscall.EAGAIN && fd.pd.pollable() {
            if err = fd.pd.waitWrite(fd.isFile); err == nil {
                continue
            }
        }
        if err != nil {
            return nn, err
        }
        if n == 0 {
            return nn, io.ErrUnexpectedEOF
        }
    }
}

Write 方法中:

(*internal.poll.pollDesc).waitWrite() 方法最终会调用 runtime.poll_runtime_pollWait() 函数,该函数的作用是等待一个文件描述符的写或读的就绪状态:

// package runtime

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    // ...
    for !netpollblock(pd, int32(mode), false) {
        errcode = netpollcheckerr(pd, int32(mode))
        if errcode != pollNoError {
            return errcode
        }
        // ...
    }
    return pollNoError
}

进一步调用 runtime.netpollblock() 函数尝试阻塞当前 goroutine:

// package runtime

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    // set the gpp semaphore to pdWait
    for {
        // Consume notification if already ready.
        if gpp.CompareAndSwap(pdReady, pdNil) {
            return true
        }
        if gpp.CompareAndSwap(pdNil, pdWait) {
            break
        }

        // Double check that this isn't corrupt; otherwise we'd loop
        // forever.
        if v := gpp.Load(); v != pdReady && v != pdNil {
            throw("runtime: double wait")
        }
    }

    // need to recheck error states after setting gpp to pdWait
    // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    // do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
    if waitio || netpollcheckerr(pd, mode) == pollNoError {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent pdReady notification
    old := gpp.Swap(pdNil)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

runtime.netpollblock() 函数做了 3 件事:

这样被写操作阻塞的 goroutine 就 park 在要操作的文件描述符上了,等待调度器调用 netpoll() 去唤醒他们。

参见

dushaoshuai commented 1 year ago
`runtime.pollDesc` 类图,github 无法渲染,留个备份 ```mermaid classDiagram internal-poll-pollDesc <..> runtime-pollDesc : identical internal-poll-FD *-- internal-poll-pollDesc net-netFD *-- internal-poll-FD os-file *-- internal-poll-FD net-conn *-- net-netFD net-rawConn *-- net-netFD net-TCPListener *-- net-netFD net-UnixListener *-- net-netFD net-IPConn *-- net-conn net-TCPConn *-- net-conn net-UDPConn *-- net-conn net-UnixConn *-- net-conn net-rawListener *-- net-rawConn os-File *-- os-file namespace runtime { class runtime-pollDesc["pollDesc"]{ ... } } namespace internal-poll { class internal-poll-pollDesc["pollDesc"]{ - runtimeCtx uintptr } class internal-poll-FD["FD"]{ - pd pollDesc ... } } namespace net { class net-netFD["netFD"]{ - pfd poll.FD ... } class net-conn["conn"]{ - fd *netFD } class net-rawConn["rawConn"]{ - fd *netFD } class net-TCPListener["TCPListener"]{ - fd *netFD ... } class net-UnixListener["UnixListener"]{ - fd *netFD ... } class net-IPConn["IPConn"]{ - conn } class net-TCPConn["TCPConn"]{ - conn } class net-UDPConn["UDPConn"]{ - conn } class net-UnixConn["UnixConn"]{ - conn } class net-rawListener["rawListener"]{ - rawConn } } namespace os { class os-file["file"]{ - pfd poll.FD } class os-File["File"]{ - *file } } ```