gh-liu / myNote

0 stars 0 forks source link

`cloudwego`的`netpoll`包 #12

Open gh-liu opened 6 months ago

gh-liu commented 6 months ago

netpoll

1. 监听net.Listenerfd

// 新建`EventLoop`,是一个事件驱动的调度器
// 处于连接管理、事件调度等...
eventLoop, _ := netpoll.NewEventLoop(onRequest OnRequest, ops ...Option)

// 通过绑定`net.Listener`提供服务
eventLoop.Serve(net.Listener)

// 优雅关闭服务
eventLoop.Shutdown(context.Context)
  1. 先看第一步返回的EventLoop
// eventloop.go

// 提供服务/优雅退出
//
// A EventLoop is a network server.
type EventLoop interface {
    // Serve registers a listener and runs blockingly to provide services, including listening to ports,
    // accepting connections and processing trans data. When an exception occurs or Shutdown is invoked,
    // Serve will return an error which describes the specific reason.
    Serve(ln net.Listener) error

    // Shutdown is used to graceful exit.
    // It will close all idle connections on the server, but will not change the underlying pollers.
    //
    // Argument: ctx set the waiting deadline, after which an error will be returned,
    // but will not force the closing of connections in progress.
    Shutdown(ctx context.Context) error
}
  1. 实现了EventLoop接口的eventLoop结构体的Serve方法:
// netpoll.go

type eventLoop struct {
    sync.Mutex
    opts *options
    svr  *server
    stop chan error
}

// Serve implements EventLoop.
func (evl *eventLoop) Serve(ln net.Listener) error {
    npln, err := ConvertListener(ln) // 转换 listener
    // ...
    // 新建一个server并运行
    evl.svr = newServer(npln, evl.opts, evl.quit)
    evl.svr.Run()
    // ...
}

// Shutdown signals a shutdown a begins server closing.
func (evl *eventLoop) Shutdown(ctx context.Context) error {}
  1. 看函数newServer新建的server,和(*server).Run方法:
// netpoll_server.go

type server struct {
    operator    FDOperator
    ln          Listener
    opts        *options
    onQuit      func(err error)
    connections sync.Map // key=fd, value=connection
}

// Run this server.
func (s *server) Run() (err error) {
    s.operator = FDOperator{             // FDOperator: 在 fd 上操作的集合
        FD:     s.ln.Fd(),               // Listener 的 fd
        OnRead: s.OnRead, 
        OnHup:  s.OnHup,
    }
    s.operator.poll = pollmanager.Pick() // pollmanager(在 init 函数初始化)管理多个(每20个cpu一个) pollers,这里获取一个poll(用于监听 fd)
    err = s.operator.Control(PollReadable)
    // ...
}
  1. 看结构体FDOperatorControl方法,参数为PollReadable
// fd_operator.go

// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
    // ...
}

func (op *FDOperator) Control(event PollEvent) error {
    if event == PollDetach && atomic.AddInt32(&op.detached, 1) > 1 {
        return nil
    }
    return op.poll.Control(op, event) // 调用 poll 的 Control 方法, poll是一个接口Poll,具体实现是结构体 defaultPoll
}
  1. 看接口Poll的具体实现defaultPollControl方法,参数为FDOperatorPollReadable
// poll.go

// Poll monitors fd(file descriptor), calls the FDOperator to perform specific actions,
// and shields underlying differences. On linux systems, poll uses epoll by default,
// and kevent by default on bsd systems.
type Poll interface {
    // ...

    // Control the event of file descriptor and the operations is defined by PollEvent.
    Control(operator *FDOperator, event PollEvent) error

    // ...
}
// poll_default_linux.go

type defaultPoll struct {
    // ...
}

// Control implements Poll.
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
    // ...
    p.setOperator(unsafe.Pointer(&evt.data), operator)                                              /// 将 FDOperator 设置为事件的自定义数据,在触发事件的时候可以取出
    switch event {
    case PollReadable: // server accept a new connection and wait read
        operator.inuse()                                                                            /// 设置状态,表示 FDOperator 正在使用
        op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR /// 设置 EPOLL_CTL 系统调用参数:增加监听,监听可读、对端关闭...
    // ...
    }
    return EpollCtl(p.fd, op, operator.FD, &evt)                                                    /// 调用 EPOLL_CTL 系统调用
}
// sys_epoll_linux.go

// 封装的 SYS_EPOLL_CTL 系统调用函数
//
// EpollCtl implements epoll_ctl.
func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) {
    _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
    if err == syscall.Errno(0) {
        err = nil
    }
    return err
}

2. poll运行:轮询是否有就绪事件

上一段第5点提到:(*server).Run方法内会通过pollmanager.Pick()选择一个poll,方法内已经将所有的poll运行起来了

// poll_manager.go

// LoadBalance is used to do load balancing among multiple pollers.
// a single poller may not be optimal if the number of cores is large (40C+).
type manager struct {
    numLoops int32
    status   int32       // 0: uninitialized, 1: initializing, 2: initialized
    balance  loadbalance // load balancing method
    polls    []Poll      // all the polls
}

// Pick will select the poller for use each time based on the LoadBalance.
func (m *manager) Pick() Poll {
START:
    // fast path
    if atomic.LoadInt32(&m.status) == managerInitialized {                                 /// 已经初始化完成,直接选择一个poll返回
        return m.balance.Pick()
    }
    // slow path
    // try to get initializing lock failed, wait others finished the init work, and try again
    if !atomic.CompareAndSwapInt32(&m.status, managerUninitialized, managerInitializing) { /// 尝试设置初始化状态,失败则重试
        runtime.Gosched()
        goto START
    }
    // adjust polls
    // m.Run() will finish very quickly, so will not many goroutines block on Pick.
    _ = m.Run()                                                                            /// 运行 manager, 会运行其管理的所有 poll

    if !atomic.CompareAndSwapInt32(&m.status, managerInitializing, managerInitialized) {   /// 尝试设置初始化状态
        // SetNumLoops called during m.Run() which cause CAS failed
        // The polls will be adjusted next Pick
    }
    return m.balance.Pick()                                                                /// 选择一个poll返回
}

// Run all pollers.
func (m *manager) Run() (err error) {
    defer func() {
        if err != nil {
            _ = m.Close()
        }
    }()

    numLoops := int(atomic.LoadInt32(&m.numLoops))      /// numLoops 的值在 init 函数内通过调用 newManager 函数进行设置了
    if numLoops == len(m.polls) {
        return nil
    }
    var polls = make([]Poll, numLoops)                  /// 初始化 poll 设置数量
    if numLoops < len(m.polls) {                        /// 小于原 poll 的个数,收缩
        // shrink polls
        copy(polls, m.polls[:numLoops])                 /// 保留numLoops个poll
        for idx := numLoops; idx < len(m.polls); idx++ {
            // close redundant polls
            if err = m.polls[idx].Close(); err != nil { /// 多余的关闭
                logger.Printf("NETPOLL: poller close failed: %v\n", err)
            }
        }
    } else {                                            /// 大于等于原 poll 的个数,收缩
        // growth polls
        copy(polls, m.polls)                            /// 复制原所有的 poll
        for idx := len(m.polls); idx < numLoops; idx++ {/// 需要新增的 poll
            var poll Poll
            poll, err = openPoll()                      /// 新增一个 poll
            if err != nil {
                return err
            }
            polls[idx] = poll
            go poll.Wait()                              /// 运行 poll
        }
    }
    m.polls = polls

    // LoadBalance must be set before calling Run, otherwise it will panic.
    m.balance.Rebalance(m.polls)
    return nil
}

新建poll:

// poll_default_linux.go

func openPoll() (Poll, error) {
    return openDefaultPoll()
}

func openDefaultPoll() (*defaultPoll, error) {
    var poll = new(defaultPoll)
    // ...
    var p, err = EpollCreate(0)                                     /// 创建一个 epoll
    // ...
    poll.fd = p                                                     /// 保存 epoll 的 fd

    var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)  /// 事件通知,返回`事件fd`
    // ...

    // ...
    poll.Handler = poll.handler                                     /// 注册处理就绪事件的函数`(p *defaultPoll) handler`
    poll.wop = &FDOperator{FD: int(r0)}                             /// 用于 epoll_wait 的时候唤醒

    if err = poll.Control(poll.wop, PollReadable); err != nil {     /// 上一段分析过,增加对`事件fd`的监听
        // ...
        return nil, err
    }

    // ...
    return poll, nil
}
// sys_epoll_linux.go

// 创建一个 epoll
// 
// EpollCreate implements epoll_create1.
func EpollCreate(flag int) (fd int, err error) {
    var r0 uintptr
    r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0)
    if err == syscall.Errno(0) {
        err = nil
    }
    return int(r0), err
}

运行poll:

// poll.go

// Poll monitors fd(file descriptor), calls the FDOperator to perform specific actions,
// and shields underlying differences. On linux systems, poll uses epoll by default,
// and kevent by default on bsd systems.
type Poll interface {
    // Wait will poll all registered fds, and schedule processing based on the triggered event.
    // The call will block, so the usage can be like:
    //
    //  go wait()
    //
    Wait() error

    //...
}
// poll_default_linux.go

type defaultPoll struct {
    // ...
}

// Wait implements Poll.
func (p *defaultPoll) Wait() (err error) {
    // ...
    // wait
    for {
        // ...
        n, err = EpollWait(p.fd, p.events, msec) /// 调用 epoll_wait,将就绪的事件保存在 p.events 中;参数 msec 表示是否阻塞调用 -1 为永久阻塞;返回值n表示就绪事件数量
        // ...
        if n <= 0 {                              /// 发生错误
            msec = -1
            runtime.Gosched()
            continue
        }
        msec = 0
        if p.Handler(p.events[:n]) {             /// 处理就绪的事件,处理函数在新建 poll 的时候注册,即`(p *defaultPoll) handler`
            return nil
        }
        // ...
    }
}
// sys_epoll_linux.go

// 对 EPOLL_WAIT 的封装
// 参数 msec 决定是否阻塞,如果阻塞,则需要调用 Syscall6,会通知 runtime 调度当前 P
//
// EpollWait implements epoll_wait.
func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {
    var r0 uintptr
    var _p0 = unsafe.Pointer(&events[0])
    if msec == 0 {
        r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0)
    } else {
        r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0)
    }
    if err == syscall.Errno(0) {
        err = nil
    }
    return int(r0), err
}

3. 处理就绪事件

上一段的运行poll提到:(*defaultPoll).Wait方法内会通过Handler方法处理就绪的事件 此处理函数在新建 poll 的时候注册,即(p *defaultPoll) handler

func (p *defaultPoll) handler(events []epollevent) (closed bool) {
    var triggerRead, triggerWrite, triggerHup, triggerError bool
    var err error
    for i := range events {
        operator := p.getOperator(0, unsafe.Pointer(&events[i].data)) /// 获取 FDOperator,在调用`(*defaultPoll).Control`方法时被设置为事件的自定义数据
        // ...

        var totalRead int
        evt := events[i].events                                       /// 获取事件,进行判断:
        triggerRead = evt&syscall.EPOLLIN != 0                        /// 触发读事件
        triggerWrite = evt&syscall.EPOLLOUT != 0                      /// 触发写事件
        triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0   /// 触发挂起或对端关闭事件
        triggerError = evt&syscall.EPOLLERR != 0                      /// 触发错误

        // trigger or exit gracefully
        if operator.FD == p.wop.FD {                                  /// 事件通知 fd
            // must clean trigger first
            syscall.Read(p.wop.FD, p.buf)
            atomic.StoreUint32(&p.trigger, 0)
            // if closed & exit
            if p.buf[0] > 0 {
                syscall.Close(p.wop.FD)
                syscall.Close(p.fd)                                   /// 关闭
                operator.done()
                return true
            }
            operator.done()
            continue
        }

        if triggerRead {                                                 /// 1. 触发读
            if operator.OnRead != nil {
                // for non-connection                                    /// 非连接
                operator.OnRead(p)                                       /// 调用 OnRead 方法
            } else if operator.Inputs != nil {
                // for connection                                        /// 连接
                bs := operator.Inputs(p.barriers[i].bs)                  /// 调用 Inputs 方法
                if len(bs) > 0 {
                    n, err := ioread(operator.FD, bs, p.barriers[i].ivs) /// 读取数据
                    operator.InputAck(n)                                 /// 确认读取完
                    totalRead += n
                    if err != nil {
                        p.appendHup(operator)
                        continue
                    }
                }
            } else {
                logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
            }
        }
        if triggerHup {                                               /// 2. 触发挂起
            if triggerRead && operator.Inputs != nil {
                // read all left data if peer send and close
                var leftRead int
                // read all left data if peer send and close
                if leftRead, err = readall(operator, p.barriers[i]); err != nil && !errors.Is(err, ErrEOF) {
                    logger.Printf("NETPOLL: readall(fd=%d)=%d before close: %s", operator.FD, total, err.Error())
                }
                totalRead += leftRead
            }
            // only close connection if no further read bytes
            if totalRead == 0 {
                p.appendHup(operator)
                continue
            }
        }
        if triggerError {                                             /// 3. 触发错误
            // Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN.
            // So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup.
            if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN {
                p.appendHup(operator)
            } else {
                operator.done()
            }
            continue
        }
        if triggerWrite {                                                                          /// 4. 触发写
            if operator.OnWrite != nil {
                // for non-connection                                                              /// 非连接
                operator.OnWrite(p)                                                                /// 调用 OnWrite 方法
            } else if operator.Outputs != nil {
                // for connection                                                                  /// 连接
                bs, supportZeroCopy := operator.Outputs(p.barriers[i].bs)                          /// 调用 Outputs 方法
                if len(bs) > 0 {
                    // TODO: Let the upper layer pass in whether to use ZeroCopy.
                    n, err := iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) /// 发送数据
                    operator.OutputAck(n)                                                          /// 确认发送完
                    if err != nil {
                        p.appendHup(operator)
                        continue
                    }
                }
            } else {
                logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
            }
        }
        operator.done()
    }
    // hup conns together to avoid blocking the poll.
    p.onhups()
    return false
}

重新看一下FDOperator结构体:

// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
    // FD is file descriptor, poll will bind when register.
    FD int

    // The FDOperator provides three operations of reading, writing, and hanging.
    // The poll actively fire the FDOperator when fd changes, no check the return value of FDOperator.
    OnRead  func(p Poll) error
    OnWrite func(p Poll) error
    OnHup   func(p Poll) error

    // The following is the required fn, which must exist when used, or directly panic.
    // Fns are only called by the poll when handles connection events.
    Inputs   func(vs [][]byte) (rs [][]byte)
    InputAck func(n int) (err error)

    // Outputs will locked if len(rs) > 0, which need unlocked by OutputAck.
    Outputs   func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
    OutputAck func(n int) (err error)

    // poll is the registered location of the file descriptor.
    poll Poll

    // protect only detach once
    detached int32

    // private, used by operatorCache
    next  *FDOperator
    state int32 // CAS: 0(unused) 1(inuse) 2(do-done)
    index int32 // index in operatorCache
}
OnRead
// netpoll_server.go

type server struct {
    // ...
}

// Run this server.
func (s *server) Run() (err error) {
    s.operator = FDOperator{
        FD:     s.ln.Fd(),
        OnRead: s.OnRead,           /// 注册 OnRead 方法
        OnHup:  s.OnHup,
    }
    // ...
}

// OnRead implements FDOperator.
func (s *server) OnRead(p Poll) error {
    // accept socket
    conn, err := s.ln.Accept()      /// 接受连接
    if err == nil {
        if conn != nil {
            s.onAccept(conn.(Conn)) /// 处理连接:存出连接池,并处理连接
        }
        // EAGAIN | EWOULDBLOCK if conn and err both nil
        return nil
    }
    logger.Printf("NETPOLL: accept conn failed: %v", err)

    // delay accept when too many open files
    // ...

    // shut down
    // ...

    return err
}
OnWrite
type netFD struct {
    // ...
}

func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, retErr error) {
    // ...
    c.pd = newPollDesc(c.fd) /// 初始化 pd,注册 pd.onwrite 方法到OnWrite,在处理时间时被调用,通过 pd.writeTrigger 通知
    for {
        // Performing multiple connect system calls on a
        // non-blocking socket under Unix variants does not
        // necessarily result in earlier errors being
        // returned. Instead, once runtime-integrated network
        // poller tells us that the socket is ready, get the
        // SO_ERROR socket option to see if the connection
        // succeeded or failed. See issue 7474 for further
        // details.
        if err := c.pd.WaitWrite(ctx); err != nil { /// 阻塞,会通过 pd.writeTrigger 唤醒
            return nil, err
        }
        // ...
    }
}
Inputs, InputAck; Outputs, OutputAck

先看一下LinkBuffer的解析

// connection_impl.go

type connection struct {
    //... 
}

func (c *connection) init(conn Conn, opts *options) (err error) {
    // ...
    c.initFDOperator()
    // ...
}

func (c *connection) initFDOperator() {
    // ...
    op.Inputs, op.InputAck = c.inputs, c.inputAck
    op.Outputs, op.OutputAck = c.outputs, c.outputAck
    // ...
}
// connection_reactor.go

// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
    vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
    return vs[:1]
}

// inputAck implements FDOperator.
func (c *connection) inputAck(n int) (err error) {
    if n <= 0 {
        c.inputBuffer.bookAck(0)
        return nil
    }

    // Auto size bookSize.
    if n == c.bookSize && c.bookSize < mallocMax {
        c.bookSize <<= 1
    }

    length, _ := c.inputBuffer.bookAck(n)
    if c.maxSize < length {
        c.maxSize = length
    }
    if c.maxSize > mallocMax {
        c.maxSize = mallocMax
    }

    var needTrigger = true
    if length == n { // first start onRequest
        needTrigger = c.onRequest()
    }
    if needTrigger && length >= int(atomic.LoadInt64(&c.waitReadSize)) {
        c.triggerRead(nil)
    }
    return nil
}

// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
    if c.outputBuffer.IsEmpty() {
        c.rw2r()
        return rs, c.supportZeroCopy
    }
    rs = c.outputBuffer.GetBytes(vs)
    return rs, c.supportZeroCopy
}

// outputAck implements FDOperator.
func (c *connection) outputAck(n int) (err error) {
    if n > 0 {
        c.outputBuffer.Skip(n)
        c.outputBuffer.Release()
    }
    if c.outputBuffer.IsEmpty() {
        c.rw2r()
    }
    return nil
}
gh-liu commented 6 months ago

LinkBuffer

Nocopy Buffer 基于链表数组实现:将 []byte 数组抽象为 block,以链表拼接的形式将 block 组合为 Nocopy Buffer 同时引入了引用计数、对象池、nocopy API

  1. 读写并行无锁,支持 nocopy 地流式读写:读写分别操作头尾指针,互不干扰
  2. 高效扩缩容:扩容,链表尾指针添加 block,无需 copy;缩容,链表头指针释放使用完的 block (对每个block进行引用计数,无引用时主动回收)
  3. block 池化,进行复用,减小GC压力
  4. 读操作,利用切片特性,灵活读取切片,通过引用计数 GC
  5. 写操纵,利用链表特性,通过 block 拼接到尾指针后的形式,无需 copy

linkBufferNode

// nocopy_linkbuffer.go

// 链表节点
type linkBufferNode struct {
    buf    []byte          // buffer                             /// 字节缓冲区
    off    int             // read-offset                        /// 读偏移量,buf可读范围 buf[off:len(buf)]
    malloc int             // write-offset                       /// 写偏移量,buf可写范围 buf[len(buf):malloc]
    refer  int32           // reference count                    /// 引用计数
    mode   uint8           // mode store all bool bit status     /// 模式:readonlyMask只读 nocopyReadMask不可复制
    origin *linkBufferNode // the root node of the extends       /// 原对象
    next   *linkBufferNode // the next node of the linked buffer /// 链表下一个节点
}

// buf: -------------------------------------
//          │           │       │
//  off读偏移量  len(buf)  malloc写偏移量

// 节点池
var linkedPool = sync.Pool{
    New: func() interface{} {
        return &linkBufferNode{
            refer: 1, // 自带 1 引用
        }
    },
}

// 初始化一个节点
// newLinkBufferNode create or reuse linkBufferNode.
// Nodes with size <= 0 are marked as readonly, which means the node.buf is not allocated by this mcache.
func newLinkBufferNode(size int) *linkBufferNode {
    var node = linkedPool.Get().(*linkBufferNode)                                 /// 从池中获取一个节点
    // reset node offset
    node.off, node.malloc, node.refer, node.mode = 0, 0, 1, defaultLinkBufferMode /// 重置节点读写便宜、引用技术、模式
    if size <= 0 {                                                                /// 节点大小 <= 0
        node.setMode(readonlyMask, true)                                          /// 只读模式
        return node
    }
    if size < LinkBufferCap {
        size = LinkBufferCap                                                      /// 设置默认大小 4k
    }
    node.buf = malloc(0, size)                                                    /// 分配内存
    return node
}

// nocopy.go
// 
// 分配字节数组
// malloc limits the cap of the buffer from mcache.
func malloc(size, capacity int) []byte {
    if capacity > mallocMax {                 /// mallocMax 为8MB
        return dirtmake.Bytes(size, capacity) /// 底层使用 runtime.mallocgc 进行分配
    }
    return mcache.Malloc(size, capacity)      /// mcache底层进行了缓存
}
// linkBufferNode 提供的一些 API

// 剩余可读字节长度
func (node *linkBufferNode) Len() (l int) {
    return len(node.buf) - node.off
}

// 当前可读数据量是否为空
func (node *linkBufferNode) IsEmpty() (ok bool) {
    return node.off == len(node.buf)
}

// 重置
func (node *linkBufferNode) Reset() {
    // 是子切片或引用数不等于1,则不能被重置
    if node.origin != nil || atomic.LoadInt32(&node.refer) != 1 {
        return
    }
    node.off, node.malloc = 0, 0
    node.buf = node.buf[:0]
    return
}

// 往后读取n个字节,并移动读指针
// 调用方自己通过Len方法检查长度n
func (node *linkBufferNode) Next(n int) (p []byte) {
    off := node.off
    node.off += n
    return node.buf[off:node.off]
}

// 往后读取n个字节,但不移动读指针
func (node *linkBufferNode) Peek(n int) (p []byte) {
    return node.buf[node.off : node.off+n]
}

// 申请字节数据用来写数据,并移动写指针
func (node *linkBufferNode) Malloc(n int) (buf []byte) {
    malloc := node.malloc
    node.malloc += n
    return node.buf[malloc:node.malloc]
}

// 同Next方法一致,读取n的字节,但是返回的是一个节点
// 同时,返回的节点会设置 origin 节点
//
// Refer holds a reference count at the same time as Next, and releases the real buffer after Release.
// The node obtained by Refer is read-only.
func (node *linkBufferNode) Refer(n int) (p *linkBufferNode) {
    p = newLinkBufferNode(0)            /// 创建只读节点
    p.buf = node.Next(n)                /// 新建的节点指向 [off:off+n] 范围

    if node.origin != nil {             /// 当前节点本身也是子节点,也是指向原始节点
        p.origin = node.origin
    } else {
        p.origin = node
    }
    atomic.AddInt32(&p.origin.refer, 1) /// 设置根节点的引用计数
    return p
}

// 释放节点:如果是子节点,则释放原始节点;如果当前节点引用数为0,则重置各属性,释放buf内存,将节点放回节点池
//
// Release consists of two parts:
// 1. reduce the reference count of itself and origin.
// 2. recycle the buf when the reference count is 0.
func (node *linkBufferNode) Release() (err error) {
    if node.origin != nil {
        node.origin.Release()                            /// 释放原始节点
    }
    // release self
    if atomic.AddInt32(&node.refer, -1) == 0 {
        // readonly nodes cannot recycle node.buf, other node.buf are recycled to mcache.
        if node.reusable() {                             /// 可复用,则释放内存
            free(node.buf)
        }
        node.buf, node.origin, node.next = nil, nil, nil /// 重置各属性
        linkedPool.Put(node)                             /// 放回节点池
    }
    return nil
}

UnsafeLinkBuffer

// nocopy_linkbuffer_norace.go
// nocopy_linkbuffer_race.go

type LinkBuffer = UnsafeLinkBuffer
// nocopy_linkbuffer.go

// UnsafeLinkBuffer implements ReadWriter.
type UnsafeLinkBuffer struct {
    length     int64                        /// 可读取数据量
    mallocSize int                          /// 已写数据量

    head  *linkBufferNode // release head   /// 上一次释放的可读节点位置
    read  *linkBufferNode // read head      /// 当前可读节点位置
    flush *linkBufferNode // malloc head    /// 上一次提交的可写节点位置
    write *linkBufferNode // malloc tail    /// 当前可写节点位置

    caches [][]byte // buf allocated by Next when cross-package, which should be freed when release /// 读取的时候可能会跨多个节点,此时会产生copy,在这里进行保存
}
// 初始化LinkBuffer: 新建一个节点
//
// NewLinkBuffer size defines the initial capacity, but there is no readable data.
func NewLinkBuffer(size ...int) *LinkBuffer {
    var buf = &LinkBuffer{}
    var l int
    if len(size) > 0 {
        l = size[0]
    }
    var node = newLinkBufferNode(l)
    buf.head, buf.read, buf.flush, buf.write = node, node, node, node
    return buf
}

// UnsafeLinkBuffer实现的两个接口,由名字可知定义了数据的读写方法
var _ Reader = &LinkBuffer{}
var _ Writer = &LinkBuffer{}

先看一下Reader/Writer两个接口是如何定义的:

// nocopy.go

// Reader is a collection of operations for nocopy reads.
//
// For ease of use, it is recommended to implement Reader as a blocking interface,
// rather than simply fetching the buffer.
// For example, the return of calling Next(n) should be blocked if there are fewer than n bytes, unless timeout.
// The return value is guaranteed to meet the requirements or an error will be returned.
type Reader interface {                        /// 定义了一系列无内存复制的读操作
    // Next returns a slice containing the next n bytes from the buffer,
    // advancing the buffer as if the bytes had been returned by Read.
    //
    // If there are fewer than n bytes in the buffer, Next returns will be blocked
    // until data enough or an error occurs (such as a wait timeout).
    //
    // The slice p is only valid until the next call to the Release method.
    // Next is not globally optimal, and Skip, ReadString, ReadBinary methods
    // are recommended for specific scenarios.
    //
    // Return: len(p) must be n or 0, and p and error cannot be nil at the same time.
    Next(n int) (p []byte, err error)          /// 读取 n 字节,且移动读指针

    // Peek returns the next n bytes without advancing the reader.
    // Other behavior is the same as Next.
    Peek(n int) (buf []byte, err error)        /// 读取 n 字节,但不移动读指针

    // Skip the next n bytes and advance the reader, which is
    // a faster implementation of Next when the next data is not used.
    Skip(n int) (err error)                    /// 跳过n字节,移动读指针

    // Until reads until the first occurrence of delim in the input,
    // returning a slice stops with delim in the input buffer.
    // If Until encounters an error before finding a delimiter,
    // it returns all the data in the buffer and the error itself (often ErrEOF or ErrConnClosed).
    // Until returns err != nil only if line does not end in delim.
    Until(delim byte) (line []byte, err error) /// 读取直到碰到指定字节,移动读指针

    // ReadString is a faster implementation of Next when a string needs to be returned.
    // It replaces:
    //
    //  var p, err = Next(n)
    //  return string(p), err
    //
    ReadString(n int) (s string, err error)    /// 读取n字节,返回字符串

    // ReadBinary is a faster implementation of Next when it needs to
    // return a copy of the slice that is not shared with the underlying layer.
    // It replaces:
    //
    //  var p, err = Next(n)
    //  var b = make([]byte, n)
    //  copy(b, p)
    //  return b, err
    //
    ReadBinary(n int) (p []byte, err error)    /// 读取n字节,返回复制的字节数组

    // ReadByte is a faster implementation of Next when a byte needs to be returned.
    // It replaces:
    //
    //  var p, err = Next(1)
    //  return p[0], err
    //
    ReadByte() (b byte, err error)             /// 读取一个字节

    // Slice returns a new Reader containing the Next n bytes from this Reader.
    //
    // If you want to make a new Reader using the []byte returned by Next, Slice already does that,
    // and the operation is zero-copy. Besides, Slice would also Release this Reader.
    // The logic pseudocode is similar:
    //
    //  var p, err = this.Next(n)
    //  var reader = new Reader(p) // pseudocode
    //  this.Release()
    //  return reader, err
    //
    Slice(n int) (r Reader, err error)         /// 读取n字节(zero-copy),返回 Reader 接口实现

    // Release the memory space occupied by all read slices. This method needs to be executed actively to
    // recycle the memory after confirming that the previously read data is no longer in use.
    // After invoking Release, the slices obtained by the method such as Next, Peek, Skip will
    // become an invalid address and cannot be used anymore.
    Release() (err error)                      /// 释放被读取的字节数组,释放前需要确认被读取的数组不再被使用

    // Len returns the total length of the readable data in the reader.
    Len() (length int)                         /// 可读字节数组长度
}

// Writer is a collection of operations for nocopy writes.
//
// The usage of the design is a two-step operation, first apply for a section of memory,
// fill it and then submit. E.g:
//
//  var buf, _ = Malloc(n)
//  buf = append(buf[:0], ...)
//  Flush()
//
// Note that it is not recommended to submit self-managed buffers to Writer.
// Since the writer is processed asynchronously, if the self-managed buffer is used and recycled after submission,
// it may cause inconsistent life cycle problems. Of course this is not within the scope of the design.
type Writer interface {                        /// 定义了一系列无内存复制的写操作
    // Malloc returns a slice containing the next n bytes from the buffer,
    // which will be written after submission(e.g. Flush).
    //
    // The slice p is only valid until the next submit(e.g. Flush).
    // Therefore, please make sure that all data has been written into the slice before submission.
    Malloc(n int) (buf []byte, err error)      /// 分配n个可写的字节数组

    // WriteString is a faster implementation of Malloc when a string needs to be written.
    // It replaces:
    //
    //  var buf, err = Malloc(len(s))
    //  n = copy(buf, s)
    //  return n, err
    //
    // The argument string s will be referenced based on the original address and will not be copied,
    // so make sure that the string s will not be changed.
    WriteString(s string) (n int, err error)   /// 将字符串写入

    // WriteBinary is a faster implementation of Malloc when a slice needs to be written.
    // It replaces:
    //
    //  var buf, err = Malloc(len(b))
    //  n = copy(buf, b)
    //  return n, err
    //
    // The argument slice b will be referenced based on the original address and will not be copied,
    // so make sure that the slice b will not be changed.
    WriteBinary(b []byte) (n int, err error)   /// 将字节数组写入

    // WriteByte is a faster implementation of Malloc when a byte needs to be written.
    // It replaces:
    //
    //  var buf, _ = Malloc(1)
    //  buf[0] = b
    //
    WriteByte(b byte) (err error)              /// 写入一个字节

    // WriteDirect is used to insert an additional slice of data on the current write stream.
    // For example, if you plan to execute:
    //
    //  var bufA, _ = Malloc(nA)
    //  WriteBinary(b)
    //  var bufB, _ = Malloc(nB)
    //
    // It can be replaced by:
    //
    //  var buf, _ = Malloc(nA+nB)
    //  WriteDirect(b, nB)
    //
    // where buf[:nA] = bufA, buf[nA:nA+nB] = bufB.
    WriteDirect(p []byte, remainCap int) error /// 写入p的同时再分配remainCap个字节

    // MallocAck will keep the first n malloc bytes and discard the rest.
    // The following behavior:
    //
    //  var buf, _ = Malloc(8)
    //  buf = buf[:5]
    //  MallocAck(5)
    //
    // equivalent as
    //  var buf, _ = Malloc(5)
    //
    MallocAck(n int) (err error)               /// 丢弃分配多的字节数量

    // Append the argument writer to the tail of this writer and set the argument writer to nil,
    // the operation is zero-copy, similar to p = append(p, w.p).
    Append(w Writer) (err error)               /// TODO?

    // Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned.
    // Its behavior is equivalent to the io.Writer hat already has parameters(slice b).
    Flush() (err error)                        /// 提交所有写好的数据

    // MallocLen returns the total length of the writable data that has not yet been submitted in the writer.
    MallocLen() (length int)                   /// 可写数据长度
}

着重分析下:

  1. Reader接口:获取可读字节数组/释放已读数据
  2. Writer接口:获取可写字节数组/提交已写数据
type Reader interface {
    // ...
    Next(n int) (p []byte, err error)          /// 读取 n 字节,且移动读指针
    // ...
    Release() (err error)                      /// 释放被读取的字节数组,释放前需要确认被读取的数组不再被使用
    // ...
    Slice(n int) (r Reader, err error)         /// 读取n字节(zero-copy),返回 Reader 接口实现
    // ...
}

type Writer interface {
    // ...
    Malloc(n int) (buf []byte, err error)      /// 分配n个可写的字节数组
    // ...
    Flush() (err error)                        /// 提交所有写好的数据
    // ...
}

// 存在两种场景:
// 1. 单Node获取数据,zero-copy实现
// 2. 跨Node获取数据,非zero-copy实现:
// 
// Next implements Reader.
func (b *UnsafeLinkBuffer) Next(n int) (p []byte, err error) {
    // ...
    b.recalLen(-n) // re-cal length    /// 减少可读数量

    // single node
    if b.isSingleNode(n) {             /// 单节点读取
        return b.read.Next(n), nil     /// 从节点获取
    }
    // multiple nodes                  /// 跨节点读取
    var pIdx int
    if block1k < n && n <= mallocMax { /// 对字节数组 p 进行分配
        p = malloc(n, n)
        b.caches = append(b.caches, p)
    } else {
        p = dirtmake.Bytes(n, n)
    }
    var l int
    for ack := n; ack > 0; ack = ack - l {            /// 跨节点读取数据,复制到字节数组 p
        l = b.read.Len()                              /// 当前可读节点的可读长度
        if l >= ack {                                 /// 大于剩余需要的长度
            pIdx += copy(p[pIdx:], b.read.Next(ack))  /// 复制
            break                                     /// 结束
        } else if l > 0 {                             /// 小于剩余需要的长度、且当前可读节点的可读长度大于0
            pIdx += copy(p[pIdx:], b.read.Next(l))    /// 复制
        }
        b.read = b.read.next                          /// 移动到下一个可读节点
    }
    _ = pIdx
    return p, nil
}

// 释放已经被读取的数据节点
//
// Release the node that has been read.
// b.flush == nil indicates that this LinkBuffer is created by LinkBuffer.Slice
func (b *UnsafeLinkBuffer) Release() (err error) {
    for b.read != b.flush && b.read.Len() == 0 { /// 可读节点在已经提交的节点之前,且可读节点的可读长度为0
        b.read = b.read.next                     /// 移动可读节点
    }
    for b.head != b.read {                       /// 从释放指针一直移动到可读指针,释放每一个节点
        node := b.head
        b.head = b.head.next
        node.Release()
    }
    for i := range b.caches {                    /// 释放跨界点读取时,产生的复制数据
        free(b.caches[i])
        b.caches[i] = nil
    }
    b.caches = b.caches[:0]                      /// 重置长度
    return nil
}

// 返回一个新的 LinkBuffer,是对当前 LinkBuffer 的 zero-copy,只读
//
// Slice returns a new LinkBuffer, which is a zero-copy slice of this LinkBuffer,
// and only holds the ability of Reader.
//
// Slice will automatically execute a Release.
func (b *UnsafeLinkBuffer) Slice(n int) (r Reader, err error) {
    if n <= 0 {                              /// 返回一个大小为0的只读节点
        return NewLinkBuffer(0), nil 
    }
    // check whether enough or not.
    if b.Len() < n {
        return r, fmt.Errorf("link buffer readv[%d] not enough", n)
    }
    b.recalLen(-n) // re-cal length          /// 重新计算buffer的长度

    // just use for range
    p := new(LinkBuffer)                     /// 新建 LinkBuffer
    p.length = int64(n)

    defer func() {
        // set to read-only
        p.flush = p.flush.next
        p.write = p.flush
    }()

    // single node
    if b.isSingleNode(n) {                   /// 单节点,使用Refer方法进行zero-copy,返回的节点是只读的
        node := b.read.Refer(n)
        p.head, p.read, p.flush = node, node, node
        return p, nil
    }
    // multiple nodes                        /// 跨节点
    l := b.read.Len()
    node := b.read.Refer(l)                  /// node 作为新节点的头节点(复制的当前读节点)
    b.read = b.read.next                     /// 移动读指针

    p.head, p.read, p.flush = node, node, node
    for ack := n - l; ack > 0; ack = ack - l {
        l = b.read.Len()
        if l >= ack {                        /// 当前节点大于需要的长度
            p.flush.next = b.read.Refer(ack) /// 复制
            p.flush = p.flush.next           /// 移动已经提交数据的节点指针:表示数据可读
            break
        } else if l > 0 {                    /// 当前节点小于需要的长度、当前节点长度大于0
            p.flush.next = b.read.Refer(l)   /// 复制
            p.flush = p.flush.next           /// 移动已经提交数据的节点指针:表示数据可读
        }
        b.read = b.read.next 
    }
    return p, b.Release()                    /// 释放已读的节点
}
// 分配可写的字节数组,写入的数据可在提交后被读取到
// 
// Malloc pre-allocates memory, which is not readable, and becomes readable data after submission(e.g. Flush).
func (b *UnsafeLinkBuffer) Malloc(n int) (buf []byte, err error) {
    // ...
    b.mallocSize += n             /// 增加已写的数据数量
    b.growth(n)                   /// 当前写节点容量不够时候,移动到下一个写节点;如果没有下一个写节点,新增一个写节点并移动到新增的写节点
    return b.write.Malloc(n), nil /// 从新增的节点获取可写数据
}

// 提交已写数据
//
// Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned.
func (b *UnsafeLinkBuffer) Flush() (err error) {
    b.mallocSize = 0                                              /// 设置已读的数量为0
    // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
    if cap(b.write.buf) > pagesize {                              /// 如果写节点的大小大于 8K 
        b.write.next = newLinkBufferNode(0)                       /// 新建一个只读节点
        b.write = b.write.next                                    /// 将新建的只读节点置为当前写节点
    }
    var n int
    for node := b.flush; node != b.write.next; node = node.next { /// 从上一次提交的节点开始,到当前写节点为止
        delta := node.malloc - len(node.buf)                      /// 节点已经写了的字节数量
        if delta > 0 {                                            /// 如果大于0
            n += delta                                            /// 用于重新计算buffer的长度
            node.buf = node.buf[:node.malloc]                     /// 移动节点内的写指针,即提交数据
        }
    }
    b.flush = b.write                                             /// 将当前节点记录,下一次提交从这里开始
    // re-cal length
    b.recalLen(n)                                                 /// 重新计算buffer长度
    return nil
}