lesismal / nbio

Pure Go 1000k+ connections solution, support tls/http1.x/websocket and basically compatible with net/http, with high-performance and low memory cost, non-blocking, event-driven, easy-to-use.
MIT License
2.11k stars 151 forks source link

使用udp发送数据,会存在丢包! #415

Closed Dganzh closed 1 month ago

Dganzh commented 3 months ago

实验代码如下: server:

package main

import (
    "fmt"
    "log"
    "sync/atomic"
    "time"

    "github.com/lesismal/nbio"
)

func main() {
    engine := nbio.NewEngine(nbio.Config{
        Network:            "udp",
        Addrs:              []string{"127.0.0.1:8080"},
        MaxWriteBufferSize: 6 * 1024 * 1024,
        UDPReadTimeout:     3 * time.Second,
    })
    var total int64
    var totalSize int64
    // For the same socket connection(same LocalAddr() and RemoteAddr()),
    // all the *nbio.Conn passed to users in OnOpen/OnData/OnClose handler is the same pointer
    engine.OnOpen(func(c *nbio.Conn) {
        //log.Printf("onOpen: [%p, %v]", c, c.RemoteAddr().String())
    })
    engine.OnData(func(c *nbio.Conn, data []byte) {
        atomic.AddInt64(&total, 1)
        atomic.AddInt64(&totalSize, int64(len(data)))
        log.Printf("onData: [%p, %v], data size:%d", c, c.RemoteAddr().String(), len(data))
        c.Write(data)
    })
    engine.OnClose(func(c *nbio.Conn, err error) {
        log.Printf("onClose: [%p, %v], %v", c, c.RemoteAddr().String(), err)
    })

    err := engine.Start()
    if err != nil {
        fmt.Printf("nbio.Start failed: %v\n", err)
        return
    }
    defer engine.Stop()
    go func() {
        for {
            time.Sleep(3 * time.Second)
            fmt.Println("============response num ", atomic.LoadInt64(&total), atomic.LoadInt64(&totalSize))
        }
    }()
    <-make(chan int)
}

client:

package main

import (
    "fmt"
    "github.com/lesismal/nbio"
    "log"
    "net"
    "strings"
    "sync"
    "sync/atomic"
    "time"
)

func main() {
    var (
        addr    = "127.0.0.1:8080"
        request = []byte(strings.Repeat("1", 250))
    )
    connNum := 10
    reqPerConn := 1000
    totalReqNum := int64(connNum * reqPerConn)
    var finishNum int64
    var totalSize int64
    wg := sync.WaitGroup{}
    wg.Add(1)
    g := nbio.NewEngine(nbio.Config{})
    g.OnClose(func(c *nbio.Conn, err error) {
        fmt.Println("OnClose", err)
    })
    g.OnData(func(c *nbio.Conn, response []byte) {
        atomic.AddInt64(&finishNum, 1)
        atomic.AddInt64(&totalSize, int64(len(response)))
        if atomic.LoadInt64(&finishNum) == totalReqNum {
            wg.Done()
        }
    })
    go func() {
        for {
            time.Sleep(3 * time.Second)
            fmt.Println("============response num ", atomic.LoadInt64(&finishNum), atomic.LoadInt64(&totalSize))
        }
    }()
    err := g.Start()
    if err != nil {
        log.Panic(err)
    }
    defer g.Stop()
    start := time.Now()
    for i := 0; i < connNum; i++ {
        go func(clientId int) {
            clientStart := time.Now()
            c, err := net.Dial("udp", addr)
            if err != nil {
                log.Panic(err)
            }
            nbc, err := g.AddConn(c)
            if err != nil {
                log.Panic(err)
            }
            for j := 0; j < reqPerConn; j++ {
                nbc.Write(request)
            }
            fmt.Println("client", c.LocalAddr().String(), clientId, "request send finish", reqPerConn, "cost", time.Since(clientStart))
        }(i)
    }

    wg.Wait()
    fmt.Println("total cost==========", time.Since(start))
}

总共发出的请求数应该是10*1k=10k 但是服务器响应的数量只有几百到1k出头(多次尝试每次不一样),客户端收到的数量和服务器响应数量一致。 server输出: 2024/04/14 23:30:32.566 [INF] NBIO Engine[NB] start with [2 eventloop], listen on: ["udp@127.0.0.1:8080"], MaxOpenFiles: 1048576 2024/04/14 23:44:16 onData: [0xc0000e66c0, 127.0.0.1:56060], data size:250 .... ============response num 614 153500 ============response num 614 153500 ....

client输出: 2024/04/14 23:44:16.718 [INF] NBIO Engine[NB] start with [2 eventloop, MaxOpenFiles: 1048576] client 127.0.0.1:56481 9 request send finish 1000 cost 1.091347ms client 127.0.0.1:53981 1 request send finish 1000 cost 1.061504ms client 127.0.0.1:42646 4 request send finish 1000 cost 962.166µs client 127.0.0.1:44508 7 request send finish 1000 cost 822.528µs client 127.0.0.1:39290 6 request send finish 1000 cost 1.287669ms client 127.0.0.1:50386 0 request send finish 1000 cost 2.585575ms client 127.0.0.1:33655 5 request send finish 1000 cost 1.413943ms client 127.0.0.1:43577 8 request send finish 1000 cost 765.022µs client 127.0.0.1:56060 2 request send finish 1000 cost 2.114236ms client 127.0.0.1:35688 3 request send finish 1000 cost 1.870757ms ============response num 614 153500 ....

Dganzh commented 3 months ago

大佬帮忙看看,测试代码有问题吗?(本意想试试,udp作为client时往server发送大批量包,是用一个Conn效率高还是多个效率高)

lesismal commented 3 months ago

感谢关注和使用nbio!

第一, udp不保证送达, 可能丢包, 所以不管测试中是否丢包, 应用层自己应该做包送达的保证或者确认业务上允许丢包, 否则都是存在线上隐患的.

第二, 这个丢包的应该是因为发包频率高, 读取方处理慢, 然后发送方的包到达太多超过了udp接收缓冲区就被丢弃了. 根据你的例子, 可以考虑如下:

  1. OnData默认是在 epoll eventloop 里读到数据后回调的, 如果这里阻塞, epoll eventloop 里也会阻塞, 所以应该尽量避免这里的 IO 操作, 所以可以把OnData里的fmt print去掉, 对data进行拷贝然后传递给另外的逻辑协程池去处理和回写, 尽量保持OnData里只有cpu消耗, 当然, 如果业务不复杂, 直接回写也能满足性能需求, 那可以不用额外的逻辑协程池, 但仍然应该避免fmt print这些IO操作. 如果需要, 也可以自己设置 Engine.OnRead, 设置后 nbio自己就不会自动读取数据也不会回调 OnData 了, 然后自己实现读取和处理的方案
  2. 设置较大的UDP接收缓冲区size. udp listener也是个fd, 对应一个 nbio.Conn, 但它是server, 如果放到OnOppen里回调就容易跟client conn混淆, 所以目前nbio Engine的udp listener没有回调OnOpen, 所以用户不能直接从nbio Engine里拿到udp listener设置参数. 但是可以用户自己创建udp listner再加入到Engine里, 调大server side udp listener的接收缓冲区到10M后, 我本地macos试了下, client/server两端能够完整收发10k个包

另外, 你的client代码里打印日志是单独协程里sleep 循环, 但是收到所有响应后程序立刻退出了未必能来得及打印, 所以把日志放到wg.Done()合理些

修改后的完整例子如下:

package main

import (
    "fmt"
    "log"
    "net"
    "sync/atomic"
    "time"

    "github.com/lesismal/nbio"
)

func main() {
    engine := nbio.NewEngine(nbio.Config{
        // Network:            "udp",
        // Addrs:              []string{"127.0.0.1:8080"},
        MaxWriteBufferSize: 6 * 1024 * 1024,
        UDPReadTimeout:     3 * time.Second,
    })

    var total int64
    var totalSize int64
    // For the same socket connection(same LocalAddr() and RemoteAddr()),
    // all the *nbio.Conn passed to users in OnOpen/OnData/OnClose handler is the same pointer
    engine.OnOpen(func(c *nbio.Conn) {
        log.Printf("onOpen: [%p, %v]", c, c.RemoteAddr().String())
    })
    engine.OnData(func(c *nbio.Conn, data []byte) {
        atomic.AddInt64(&total, 1)
        atomic.AddInt64(&totalSize, int64(len(data)))
        // log.Printf("onData: [%p, %v], data size:%d", c, c.RemoteAddr().String(), len(data))
        c.Write(data)
    })
    engine.OnClose(func(c *nbio.Conn, err error) {
        log.Printf("onClose: [%p, %v], %v", c, c.RemoteAddr().String(), err)
    })

    err := engine.Start()
    if err != nil {
        fmt.Printf("nbio.Start failed: %v\n", err)
        return
    }
    defer engine.Stop()

    addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:8080")
    if err != nil {
        panic(err)
    }
    ln, err := net.ListenUDP("udp", addr)
    if err != nil {
        panic(err)
    }
    nbcLn, err := nbio.NBConn(ln)
    if err != nil {
        panic(err)
    }
    // 调大udp接收缓冲区size
    nbcLn.SetReadBuffer(1024 * 1024 * 10)
    // 添加udp listener Conn到Engine
    engine.AddConn(nbcLn)

    go func() {
        for {
            time.Sleep(3 * time.Second)
            fmt.Println("============response num ", atomic.LoadInt64(&total), atomic.LoadInt64(&totalSize))
        }
    }()
    <-make(chan int)
}
package main

import (
    "fmt"
    "log"
    "net"
    "strings"
    "sync"
    "sync/atomic"
    "time"

    "github.com/lesismal/nbio"
)

func main() {
    var (
        addr    = "127.0.0.1:8080"
        request = []byte(strings.Repeat("1", 250))
    )
    connNum := 10
    reqPerConn := 1000
    totalReqNum := int64(connNum * reqPerConn)
    var finishNum int64
    var totalSize int64
    wg := sync.WaitGroup{}
    wg.Add(1)
    g := nbio.NewEngine(nbio.Config{})
    g.OnClose(func(c *nbio.Conn, err error) {
        fmt.Println("OnClose", err)
    })
    g.OnData(func(c *nbio.Conn, response []byte) {
        atomic.AddInt64(&finishNum, 1)
        atomic.AddInt64(&totalSize, int64(len(response)))
        if atomic.LoadInt64(&finishNum) == totalReqNum {
            wg.Done()
        }
    })
    // go func() {
    //  for {
    //      time.Sleep(3 * time.Second)
    //      fmt.Println("============response num ", atomic.LoadInt64(&finishNum), atomic.LoadInt64(&totalSize))
    //  }
    // }()
    err := g.Start()
    if err != nil {
        log.Panic(err)
    }
    defer g.Stop()
    start := time.Now()
    for i := 0; i < connNum; i++ {
        go func(clientId int) {
            clientStart := time.Now()
            c, err := net.Dial("udp", addr)
            if err != nil {
                log.Panic(err)
            }
            nbc, err := g.AddConn(c)
            if err != nil {
                log.Panic(err)
            }
            for j := 0; j < reqPerConn; j++ {
                nbc.Write(request)
            }
            fmt.Println("client", c.LocalAddr().String(), clientId, "request send finish", reqPerConn, "cost", time.Since(clientStart))
        }(i)
    }

    wg.Wait()
    fmt.Println("total cost==========", time.Since(start))
    fmt.Println("============response num ", atomic.LoadInt64(&finishNum), atomic.LoadInt64(&totalSize))
}
lesismal commented 3 months ago

我会考虑给 udp listener 加个回调, 例如 Engine.OnUDPListen(c *nbio.Conn), 方便用户初始化通过配置启动的udp listener

Dganzh commented 3 months ago
  1. 一直都知道udp会丢包,没想到丢包率会这么高;
  2. 试了你的方式,增加接收方缓冲区有效果,但是有些环境仍然丢包很严重,缓冲区调再大都没用;这时候只能降低发包频率了:
    for j := 0; j < reqPerConn; j++ {
    nbc.Write(request)
    time.Sleep(time.Duration(rand.Int31n(500)) * time.Microsecond)
    }
lesismal commented 3 months ago

如果是公网, 即使调整频率和调大缓冲区, 中间设备也可能丢包. 如我上面1中说的, 你应该自己在应用层做传输保证或者确认业务上允许丢包. 既然自己也知道, 就不应该纠结这个丢包问题了. 方案是依赖可靠传输, 但udp本身不给这个保证, 那就不要对udp心存幻想了

Dganzh commented 3 months ago

我的场景是这样的,都是在内网,A集群和B集群通过某种协议互相通讯,该协议传输层是udp,现在要在A和B之间插入一层代理P,加入的这层P肯定是希望尽可能处理快、消耗低,上层应用已考虑丢包情况,但是这层代理仍然希望尽可能少丢包。那么要如何控制发包频率比较合适

lesismal commented 3 months ago

我的场景是这样的,都是在内网,A集群和B集群通过某种协议互相通讯,该协议传输层是udp,现在要在A和B之间插入一层代理P,加入的这层P肯定是希望尽可能处理快、消耗低,上层应用已考虑丢包情况,但是这层代理仍然希望尽可能少丢包。那么要如何控制发包频率比较合适

如果中间这层代理只做转发, 没什么应用层消耗, 只在OnData 里直接回写的话只是用到每个poller eventloop的固定buffer也不会有太大的额外内存分配. udp listener这个也只是对应一个fd, 把它设置更大点都可以的. 但这个单个udp listener fd只能利用单个协程最多单线程单cpu核心, 所以未必效率高, 可以考虑利用多核, 比如引入逻辑协程池去异步处理(需要内存OnData里做内存拷贝), 或者多开几个udp listener 然后做负载分流(能够一定程度上负载均衡但不能保证每个fd完全均衡, 取决于业务层实际量) udp尽量MTU以内, 所以2^N size对齐的buffer pool足够了, 而且只是转发没什么其他消耗, buffer pool复用率高, 内存这块应该问题不大的

另外, 这种udp的其实直接用标准库也差不多效率, 一组协程池去并发读udp listener这个conn然后转发就行了, 标准库udp conn自带锁所以多个并发读也不用加锁, 这个更自然均衡并且固定的逻辑协程池每个协程内固定的buffer复用更简单

虽然我希望nbio被更多人使用, 但我不喜欢盲目鼓吹和鼓励. nbio的主要是处理海量连接避免每个连接一个协程导致高内存占用和海量对象的gc压力, 只是转发数据这种标准库方案就可以避免这些问题. 所以, 我建议可以试试直接用标准库看看效果对比下, 然后可能没必要做我上面说的那些额外逻辑协程池和buffer pool 相关的了

Dganzh commented 3 months ago

之前细节描述不够,是只能固定监听一个IP端口,所以只能有一个listener。另外udp包需要解析的,通信过程第一步是,A中的一个节点带着aid1发起CreateSesion,随机发到一个B节点,然后响应bid1,使得aid1和bid1绑定起来,后续发的包中都会带上bid1,从而实现稳定转发到CreateSession的相应B节点。 代理这一层要做的就是,只开放一个IP端口,作为统一入口,接收所有流量,解析包,然后根据映射关系转发到目标节点。因为流量都汇总到这里了,所以量很大。 这场景下nbio适用吗

lesismal commented 3 months ago

这场景下nbio适用吗

我建议先用标准库试试, 标准库一组协程池读udp listener然后做对应的逻辑就可以了, 逻辑也简单, 均衡性也好. 非海量连接必须一个连接一个协程的场景, nbio同样适用, 但相对于标准库nbio没有优势, 我只想建议使用正确的方案, 比如下面的标准库方式:

package main

import (
    "fmt"
    "log"
    "net"
    "os"
    "os/signal"
    "runtime"
)

func main() {
    addrstr := "127.0.0.1:8888"
    addr, err := net.ResolveUDPAddr("udp", addrstr)
    if err != nil {
        fmt.Println("ResolveUDPAddr error:", err)
        os.Exit(1)
    }
    ln, err := net.ListenUDP("udp", addr)
    if err != nil {
        fmt.Println("listen error:", err)
        os.Exit(1)
    }
    defer ln.Close()
    ln.SetReadBuffer(1024 * 1024 * 1024)

    // 启动一组协程池去读udp listener,固定处理io和业务,自动均衡
    logicNum := runtime.NumCPU() * 64 // 根据你们实际压测来定这个数量
    for i := 0; i < logicNum; i++ {
        go func() {
            // 相比于nbio,每个协程这个固定buffer复用最佳,栈友好,也不需要额外的pool Get/Put逻辑的额外消耗
            buf := make([]byte, 1024)
            for {
                if packLen, remoteAddr, err := ln.ReadFromUDP(buf); err == nil {
                    // 你的业务逻辑
                    log.Printf("onData: [%p, %v], %v", remoteAddr, remoteAddr.String(), string(buf[:packLen]))
                    ln.WriteToUDP(buf[:packLen], remoteAddr)
                } else {
                    // err handle
                }
            }
        }()
    }

    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)
    <-interrupt
}
lesismal commented 3 months ago

如果你们转发往下游需要udp client并且需要读取下游主动发来的数据, 那如果用标准库就需要每个udp client一个goroutine了, 这样不划算, 这个可以考虑用nbio. 当然, 取决于你门的连接数数量, 如果不是特别大量连接, 比如1w-10w这个范围的脸结束, 标准库也是足够强的, 可以以实际压测为准再决定要不要用nbio

Dganzh commented 3 months ago

感谢答疑,我试试两种方式。

lesismal commented 3 months ago

github-actions[bot] commented 2 months ago

This issue is stale because it has been open for 30 days with no activity.

github-actions[bot] commented 1 month ago

This issue was closed because it has been inactive for 14 days since being marked as stale.