lesismal / nbio

Pure Go 1000k+ connections solution, support tls/http1.x/websocket and basically compatible with net/http, with high-performance and low memory cost, non-blocking, event-driven, easy-to-use.
MIT License
2.11k stars 151 forks source link

AsyncRead 代码疑惑 #401

Closed yicixin closed 4 months ago

yicixin commented 4 months ago

https://github.com/lesismal/nbio/blob/5bf91e67b31f0d6587adc3ecbda9956a2d788990/conn_unix.go#L133 这里是不是应该return了呢?如果继续执行可能会在https://github.com/lesismal/nbio/blob/5bf91e67b31f0d6587adc3ecbda9956a2d788990/conn_unix.go#L145 处创建出第二个异步读的协程吧

lesismal commented 4 months ago

对的,感谢指正!要不要来个PR?

yicixin commented 4 months ago

我对下面这部分代码有些不太理解,希望作者能解答一下

在非oneShot的情况下,cnt最大值会是2,且为2时直接return,已经执行的c.readEvents+=1需要在下面的g.IOExecute中减掉。

但什么时候会有cnt为2的情况呢, AsyncRead只有在ET模式下使用,ET模式在本次未处理完数据前不会进行第二次事件触发吧。 是为了兜底 "本次处理的协程刚好处理完数据还未退出,第二次事件立马触发"这种场景吗?

func (c *Conn) AsyncRead() {
    ...
    cnt := atomic.AddInt32(&c.readEvents, 1)
    if cnt > 2 {
        atomic.AddInt32(&c.readEvents, -1)
        return
    }
    if cnt > 1 {
        return
    }

    g.IOExecute(func(buffer []byte) {
        for {
            for i := 0; i < g.MaxConnReadTimesPerEventLoop; i++ {
                rc, n, err := c.ReadAndGetConn(buffer)
                if n > 0 {
                    g.onData(rc, buffer[:n])
                }
                if errors.Is(err, syscall.EINTR) {
                    continue
                }
                if errors.Is(err, syscall.EAGAIN) {
                    break
                }
                if err != nil {
                    c.closeWithError(err)
                    return
                }
                if n < len(buffer) {
                    break
                }
            }
            if atomic.AddInt32(&c.readEvents, -1) == 0 {
                return
            }
        }
    })
}
lesismal commented 4 months ago

但什么时候会有cnt为2的情况呢, AsyncRead只有在ET模式下使用,ET模式在本次未处理完数据前不会进行第二次事件触发吧。

如果我没记错的话,ET对于read,是每次读缓冲区有新的数据都会触发可读(包括收到EOF),即使之前已经触发但是没读完、只要发生变化就会触发。逆向思考下,如果有变化不触发、那就跟ONESHOT一样了也就没必要再单独支持ONESHOT事件了

lesismal commented 4 months ago

写了个简单例子你看下日志就懂了,别相信我的日志,你自己跑下才印象深刻:

package main

import (
    "fmt"
    "net"
    "os"
    "syscall"
    "time"
)

const (
    EPOLLET        = 1 << 31
    MaxEpollEvents = 32

    epollEventsError = syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP
    epollEventsRead  = EPOLLET | syscall.EPOLLPRI | syscall.EPOLLIN
    epollEventsWrite = syscall.EPOLLOUT
)

func main() {
    var event syscall.EpollEvent
    var events [MaxEpollEvents]syscall.EpollEvent

    fd, err := syscall.Socket(syscall.AF_INET, syscall.O_NONBLOCK|syscall.SOCK_STREAM, 0)
    if err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
    defer syscall.Close(fd)

    if err = syscall.SetNonblock(fd, true); err != nil {
        fmt.Println("setnonblock1: ", err)
        os.Exit(1)
    }

    addr := syscall.SockaddrInet4{Port: 2000}
    copy(addr.Addr[:], net.ParseIP("127.0.0.1").To4())

    syscall.Bind(fd, &addr)
    syscall.Listen(fd, 10)

    epfd, e := syscall.EpollCreate1(0)
    if e != nil {
        fmt.Println("epoll_create1: ", e)
        os.Exit(1)
    }
    defer syscall.Close(epfd)

    event.Events = syscall.EPOLLIN
    event.Fd = int32(fd)
    if e = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, fd, &event); e != nil {
        fmt.Println("epoll_ctl: ", e)
        os.Exit(1)
    }

    time.AfterFunc(time.Second, func() {
        c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", addr.Port))
        if err != nil {
            panic(err)
        }
        c.Write([]byte("hello"))
        time.Sleep(time.Second / 2)
        c.Write([]byte("world"))
        time.Sleep(time.Second / 2)
        c.Close()
    })

    readCnt := 0
    for {
        nevents, e := syscall.EpollWait(epfd, events[:], -1)
        if e != nil {
            fmt.Println("epoll_wait: ", e)
            break
        }

        for ev := 0; ev < nevents; ev++ {
            e := events[ev]
            if int(e.Fd) == fd {
                connFd, _, err := syscall.Accept(fd)
                if err != nil {
                    fmt.Println("accept: ", err)
                    continue
                }
                syscall.SetNonblock(fd, true)
                event.Events = epollEventsRead | epollEventsError
                event.Fd = int32(connFd)
                if err := syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, connFd, &event); err != nil {
                    fmt.Print("epoll_ctl: ", connFd, err)
                    os.Exit(1)
                }
            } else {
                // 日志可以看到这里一共收到三次事件
                fmt.Println("--- 000 on event, fd:", e.Fd, e.Events)

                if e.Events&epollEventsWrite != 0 {
                    fmt.Println("--- 111 on write, fd:", e.Fd)
                }

                // 日志可以看到会收到三次可读,对方发送了两次数据各一次,对方close时因为之前的数据还没有读取所以close触发时仍然包括可读事件同时包括EOF触发的err
                if e.Events&epollEventsRead != 0 {
                    fmt.Println("--- 222 on read, fd:", e.Fd)
                    readCnt++
                    if readCnt == 3 {
                        buf := make([]byte, 1024)
                        n, err := syscall.Read(int(e.Fd), buf)
                        fmt.Println("+++ read data:", n, err)
                    }

                }

                // 日志可以看到会收到一次err,是与第三次可读事件一块的
                if e.Events&epollEventsError != 0 {
                    fmt.Println("--- 333 on close, fd:", e.Fd)
                }
            }
        }

    }
}

output:

--- 000 on event, fd: 9 1
--- 222 on read, fd: 9
--- 000 on event, fd: 9 1
--- 222 on read, fd: 9
--- 000 on event, fd: 9 8193
--- 222 on read, fd: 9
+++ read data: 10 <nil>
--- 333 on close, fd: 9
lesismal commented 4 months ago

其实仔细想想,好像应该是不管什么事件触发了,触发后都收到了当前这个fd的所有事件,比如上面例子中的:之前未读取所以仍然可读时、被close触发、则同时收到可读和错误事件

lesismal commented 4 months ago

因为ET非ONESHOT会重复触发,所以用了atomic计数,尽量避免当前正在读尚未完成时、再收到读事件导致不必要的重复读取(读不到数据)浪费syscall

yicixin commented 4 months ago

的确是我对ET理解有误,感谢解答

yicixin commented 4 months ago

已提交pr修复return的问题

lesismal commented 4 months ago

好的,合并了,感谢!