lesismal / nbio

Pure Go 1000k+ connections solution, support tls/http1.x/websocket and basically compatible with net/http, with high-performance and low memory cost, non-blocking, event-driven, easy-to-use.
MIT License
2.06k stars 151 forks source link

向大佬求助,如何用这个库写出生产者和消费者的代码结构? #385

Closed ericjing83 closed 3 months ago

ericjing83 commented 5 months ago

您好大佬,

请教下,我想让这个库负责读消息的协程发送任务给工作协程(消费者),就像下面的代码,生产者readPump发送任务给hub,但是,我刚接触nbio,不知道用这个库如何实现下面的代码,向大佬求助: package main

import ( "bytes" "flag" "github.com/fasthttp/websocket" "github.com/valyala/fasthttp" "log" "time" )

const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10

// Maximum message size allowed from peer.
maxMessageSize = 512

)

var ( newline = []byte{'\n'} space = []byte{' '} )

var upgrader = websocket.FastHTTPUpgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } var addr = flag.String("addr", ":8080", "http service address") var hub = Hub{ broadcast: make(chan []byte), register: make(chan Client), unregister: make(chan Client), clients: make(map[*Client]bool), }

// Client is a middleman between the websocket connection and the hub. type Client struct {

// The websocket connection.
conn *websocket.Conn

// Buffered channel of outbound messages.
send chan []byte

}

// readPump pumps messages from the websocket connection to the hub. // // The application runs readPump in a per-connection goroutine. The application // ensures that there is at most one reader on a connection by executing all // reads from this goroutine. func (c *Client) readPump() { defer func() { hub.unregister <- c c.conn.Close() }() c.conn.SetReadLimit(maxMessageSize) c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { _, message, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("error: %v", err) } break } message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) hub.broadcast <- message } }

// writePump pumps messages from the hub to the websocket connection. // // A goroutine running writePump is started for each connection. The // application ensures that there is at most one writer to a connection by // executing all writes from this goroutine. func (c *Client) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.conn.Close() }() for { select { case message, ok := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { // The hub closed the channel. c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return }

        w, err := c.conn.NextWriter(websocket.TextMessage)
        if err != nil {
            return
        }
        w.Write(message)

        // Add queued chat messages to the current websocket message.
        n := len(c.send)
        for i := 0; i < n; i++ {
            w.Write(newline)
            w.Write(<-c.send)
        }

        if err := w.Close(); err != nil {
            return
        }
    case <-ticker.C:
        c.conn.SetWriteDeadline(time.Now().Add(writeWait))
        if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
            return
        }
    }
}

}

// serveWs handles websocket requests from the peer. func serveWs(ctx fasthttp.RequestCtx) { err := upgrader.Upgrade(ctx, func(conn websocket.Conn) { client := &Client{conn: conn, send: make(chan []byte, 256)} hub.register <- client

    go client.writePump()
    client.readPump()
})

if err != nil {
    log.Println(err)
}

}

// Hub maintains the set of active clients and broadcasts messages to the clients. type Hub struct { // Registered clients. clients map[*Client]bool

// Inbound messages from the clients.
broadcast chan []byte

// Register requests from the clients.
register chan *Client

// Unregister requests from clients.
unregister chan *Client

}

func (h *Hub) run() { for { select { case client := <-h.register: h.clients[client] = true case client := <-h.unregister: if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.send) } case message := <-h.broadcast: for client := range h.clients { select { case client.send <- message: default: close(client.send) delete(h.clients, client) } } } } }

func serveHome(ctx *fasthttp.RequestCtx) { log.Println(string(ctx.Path()))

if !ctx.IsGet() {
    ctx.Error("Method not allowed", fasthttp.StatusMethodNotAllowed)
    return
}
fasthttp.ServeFile(ctx, "../home.html")

}

func main() { flag.Parse() go hub.run()

requestHandler := func(ctx *fasthttp.RequestCtx) {
    switch string(ctx.Path()) {
    case "/":
        serveHome(ctx)
    case "/ws":
        serveWs(ctx)
    default:
        ctx.Error("Unsupported path", fasthttp.StatusNotFound)
    }
}

server := fasthttp.Server{
    Name:    "ChatExample",
    Handler: requestHandler,
}

log.Fatal(server.ListenAndServe(*addr))

}

lesismal commented 5 months ago

我没有你完整的资源、所以没调试,大概是下面这样子,细节需要自己看下:

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/signal"
    "time"

    "github.com/lesismal/nbio/nbhttp"
    "github.com/lesismal/nbio/nbhttp/websocket"
)

var (
    addr = flag.String("addr", ":8080", "http service address")

    hub = Hub{
        broadcast:  make(chan []byte),
        register:   make(chan *websocket.Conn),
        unregister: make(chan *websocket.Conn),
        clients:    make(map[*websocket.Conn]bool),
    }

    upgrader = newUpgrader()
)

func main() {
    flag.Parse()

    go hub.run()

    mux := &http.ServeMux{}
    mux.HandleFunc("/", serveHome)
    mux.HandleFunc("/ws", serveWs)
    engine := nbhttp.NewEngine(nbhttp.Config{
        Network: "tcp",
        Addrs:   []string{*addr},
        // MaxLoad:                 1000000,
        ReleaseWebsocketPayload: true,
        Handler:                 mux,

        // websocket 单帧最大载荷
        MaxWebsocketFramePayloadSize: 512,
    })

    err := engine.Start()
    if err != nil {
        fmt.Printf("nbio.Start failed: %v\n", err)
        return
    }

    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)
    <-interrupt

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
    defer cancel()
    engine.Shutdown(ctx)
}

func newUpgrader() *websocket.Upgrader {
    u := websocket.NewUpgrader()

    // 服务端主动发心跳检测浪费资源
    // 让客户端发心跳、服务端只要读超时关闭就可以了,这样不白白浪费服务端定时器、更好
    u.KeepaliveTime = 60 * time.Second
    u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, message []byte) {
        hub.broadcast <- message
    })
    u.OnClose(func(c *websocket.Conn, err error) {
        hub.unregister <- c
    })
    return u
}

func serveHome(w http.ResponseWriter, r *http.Request) {
    log.Println(r.URL.Path)

    if r.Method != http.MethodGet {
        w.WriteHeader(http.StatusMethodNotAllowed)
        fmt.Fprintf(w, "Method not allowed")
        return
    }

    http.ServeFile(w, r, "../home.html")
}

func serveWs(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }
    fmt.Println("Upgraded:", conn.RemoteAddr().String())
    hub.register <- conn
}

// Hub maintains the set of active clients and broadcasts messages to the clients.
type Hub struct {
    // Registered clients.
    clients map[*websocket.Conn]bool

    // Inbound messages from the clients.
    broadcast chan []byte

    // Register requests from the clients.
    register chan *websocket.Conn

    // Unregister requests from clients.
    unregister chan *websocket.Conn
}

func (h *Hub) run() {
    for {
        select {
        case client := <-h.register:
            h.clients[client] = true
        case client := <-h.unregister:
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
            }
        case message := <-h.broadcast:
            for conn := range h.clients {
                conn.WriteMessage(websocket.TextMessage, message)
            }
        }
    }
}
ericjing83 commented 5 months ago

非常感谢您的指导。我想请教下, 1.我的客户端是网页端,通过javascript发送websocket消息,如果我在应用层不写代码进行拆包的话(按照websocket网页端底层自带的拆包逻辑维护数据包的大小),上面的代码设置单个数据包大小MaxWebsocketFramePayloadSize=512,有没有可能导致golang服务端拒绝接受网页端发来的websocket消息? 2.MaxWebsocketFramePayloadSize这个参数设置为512是因为考虑到大并发的性能吗?

lesismal commented 5 months ago

1.我的客户端是网页端,通过javascript发送websocket消息,如果我在应用层不写代码进行拆包的话(按照websocket网页端底层自带的拆包逻辑维护数据包的大小),上面的代码设置单个数据包大小MaxWebsocketFramePayloadSize=512,有没有可能导致golang服务端拒绝接受网页端发来的websocket消息?

websocket协议本身就处理了所谓“粘包”,每个message给到你都是一个完整的message,gorilla/fasthttp那些直接用 ReadMessage、用NextReader读到EOF,都是一个完整message nbio的OnMessage传递给你的是一个完整Message 所以你自己额外加上 newline、space 这些去拆包首先是多余的完全没必要的,另外因为没什么好办法保证对方发过来的数据不包含你自己设定的字符组合,如果对方内容里有这个、你才包就出错了,所以你自己额外加的这种靠特定字符拆包的逻辑反倒可能导致bug、或者需要做更多的交互限制确保正常的客户端不要发这种,但如果有别人伪造client呢你就没法保证、然后把别人伪造的消息广布出去了也不是好事情。

2.MaxWebsocketFramePayloadSize这个参数设置为512是因为考虑到大并发的性能吗?

我是看你的例子中设置了:c.conn.SetReadLimit(maxMessageSize),maxMessageSize是512,所以我也写上了。 具体设置多少需要根据你自己项目的需要来定,通常来讲512太小了,可以给框架设置4k、8k这种,应用层的长度限制逻辑例如你们前端输入就限制了512,那可以在你们OnMessage的地方加上一道判断也可以、不用把框架限制这么低

ericjing83 commented 5 months ago

非常感谢您的解答,学习了。我也觉得512太小了,可能会导致网络协议层更频繁地拆包或者合并frame,会损失性能。 再请教您, 1.如果我想把nbio固定为none block mode,按照您上面的代码,默认的模式就是IOModNonBlocking是吧? 2.这个模式下,默认有多少个多路复用的goroutine(我知道这种模式下一个goroutine会处理多个用户连接)?我可以修改这里的多路复用的goroutine数量吗? 3.none block mode这个模式我如果想修改,代码里我改哪个参数啊?

lesismal commented 5 months ago

1.如果我想把nbio固定为none block mode,按照您上面的代码,默认的模式就是IOModNonBlocking是吧?

对,默认是 IOModNonBlocking: https://github.com/lesismal/nbio/blob/master/nbhttp/engine.go#L39

2.这个模式下,默认有多少个多路复用的goroutine(我知道这种模式下一个goroutine会处理多个用户连接)?我可以修改这里的多路复用的goroutine数量吗?

多路复用的goroutine数量主要是逻辑协程池,默认的逻辑协程池size是 runtime.NumCPU() * 1024https://github.com/lesismal/nbio/blob/master/nbhttp/engine.go#L920

我知道这种模式下一个goroutine会处理多个用户连接:所有连接都共用这组逻辑协程池,能处理多少主要取决于你的数据交互频率、payload size、硬件本身的能力。如果业务本身的计算量不大,默认配置通常足够用了。

这里有IOModNonBlocking模式下的压测,ubuntu vm设置给nbio server4核cpu,代码里让gc尽量限制在2g内存,一百万连接数,1k payload,qps还能保持10w以上: https://github.com/lesismal/go-websocket-benchmark

如果业务比较特殊,可以根据具体情况定制逻辑协程池、内存分配这些

3.none block mode这个模式我如果想修改,代码里我改哪个参数啊?

几个常用配置项:

engine = nbhttp.NewEngine(nbhttp.Config{
    // 设置逻辑协程池size
    MessageHandlerPoolSize: 10000,

    // 设置IOMod,不设置默认为 IOModNonBlocking
    IOMod: nbhttp.IOModMixed,
    // 如果设置为IOModMixed混合模式,需要设置这个阻塞模式最大连接数:
    // 当前阻塞连接数少于这个值时,新来的连接也是用阻塞模式、每个连接一个固定的读协程
    // 当前阻塞连接数达到这个值时,心来的连接就会用nbio自己poller处理,用公共的逻辑协程池
    MaxBlockingOnline: 100000,
})
ericjing83 commented 5 months ago

大佬,请教下您, newUpgrader函数里面,触发器u.OnMessage里面传入的匿名函数是不是在nbio的每个逻辑协程池里会并发运行的?按我的理解,这个匿名函数是在逻辑协程池里运行的,不知道我的理解对不对。

lesismal commented 5 months ago

第一,OnMessage只是设置回调,在哪里调用OnMessage设置回调只是设置行为,在什么地方执行要看在什么地方回调,所以跟是不是匿名函数没关系,即使不是匿名函数、而是普通函数也是同样道理,所以不要想太多跑偏了基础知识。 第二,nbio是在并发协程池里执行OnMessage的回调。

ericjing83 commented 5 months ago

大佬您好, 假设我想达到100万并发的效果,nbio有10000个逻辑协程,然后,我想让这10000个nbio的goroutine通过轮询的方式(负载均衡)给我的多个worker发送消息(我测试下来,golang默认的channel,如果有多个worker同时获取任务,性能会降低,CPU占用率明显提高。如果每个一个worker协程分别设置一个channel,这个时候CPU占用率明显降低。) 所以,我想知道每个nbio逻辑协程的ID,然后通过ID % NumberOfWorker 来进行负载均衡(这样的负载均衡最简单,同时也没有锁和原子操作,不会产生额外的CPU消耗)。我如果想知道nbio的每个逻辑协程的id(从0开始的整数),我可以通过一些途径获取吗?或者,我如果自己想增加这个属性,可以从哪里加啊?

lesismal commented 5 months ago

nbio默认的逻辑协程池并不是为每个Conn常驻一个协程、没有通过id获取的功能,有解析道协议请求数据才会有协程去负责调用OnMessage这些,所以你上面的说法对于nbio默认实现来讲、没有相关性

nbio自带的逻辑协程池实现是这个: https://github.com/lesismal/nbio/blob/master/taskpool/taskpool.go

如果想用自己定制的逻辑协程池,可以自行定制:

engine := nbhttp.NewEngine(nbhttp.Config{
    ServerExecutor: YourExecutor,
})
ericjing83 commented 5 months ago

好的。感谢大佬指点。 实在麻烦的话,我就想办法用每个end user的userid去进行负载均衡,我想办法让所有的userid都单调递增,然后userid % NumberOfWorker来实现负载均衡。百万用户的情况下,userid大致上可以让请求的负载在多个worker之间均衡。 自己定义逻辑协程池的话,我怕哪里弄得不对影响到nbio稳定,我尽可能保持nbio的官方默认配置。

lesismal commented 5 months ago
  1. nbio现在默认的逻辑协程策略,是哪个连接有数据就处理哪个,逻辑协程池自己是动态均衡的;
  2. 对于每个conn的消息处理,如果你们需要做均衡,那应用层可以再做一层这种worker;广播遍历数量多、单个map也不划算,也可以按id之类的hash到不同worker上面,广播的时候每个worker只需要遍历自己这一份,并发广播也跟快一点
lesismal commented 5 months ago

你们业务是哪种类型?互联网还是物联网?或者其他?

ericjing83 commented 5 months ago

大佬, 我们的业务是互联网,企业办公的场景,对数据实时性要求没有物联网那么高。

lesismal commented 5 months ago

我估计逻辑协程池不太需要额外定制,如果连接数很大并且对广播功能有性能需求可以考虑优化广播的效率

ericjing83 commented 5 months ago

又想到一个问题,再请教下大佬, 如果发送给同一个conn的两条websocket消息,时间间隔很短(比如小于10毫秒),nbio库会不会把这两条消息合并为一条消息发送,然后,在这两条消息之间增加一个分隔符? 如果可以这样,可能会降低大并发时候的消息发送次数,我对nbio的底层不了解,我不知道这样可不可以提升大并发时候nbio库的性能。在实际业务场景中,有些消息是群发的,所以一个人可能会在很短的时间里收到多条消息。

ericjing83 commented 5 months ago

大佬,请教下您,

基于这个issue的示例代码,如果按照生产者消费者的方式去响应请求: func (h *Hub) run() { for { select { case client := <-h.register: h.clients[client] = true case client := <-h.unregister: if _, ok := h.clients[client]; ok { delete(h.clients, client) } case message := <-h.broadcast: for conn := range h.clients { conn.WriteMessage(websocket.TextMessage, message) } } } }

我想请教下,h.broadcast路由里的处理逻辑,按我的理解,内部的逻辑是被我自己的worker处理的: for conn := range h.clients { conn.WriteMessage(websocket.TextMessage, message) } 但是,每次循环里的conn.WriteMessage(websocket.TextMessage, message)这句话应该是被nbio的逻辑协程池处理的。 请大佬指正,我的理解对吗?

guonaihong commented 5 months ago

开启内核的延迟发送就行。应用层拼接性能没内核做得好。

---原始邮件--- 发件人: @.> 发送时间: 2024年1月25日(周四) 上午9:37 收件人: @.>; 抄送: @.***>; 主题: Re: [lesismal/nbio] 向大佬求助,如何用这个库写出生产者和消费者的代码结构? (Issue #385)

又想到一个问题,再请教下大佬, 如果发送给同一个conn的两条websocket消息,时间间隔很短(比如小于10毫秒),nbio库会不会把这两条消息合并为一条消息发送,然后,在这两条消息之间增加一个分隔符? 如果可以这样,可能会降低大并发时候的消息发送次数,我对nbio的底层不了解,我不知道这样可不可以提升大并发时候nbio库的性能。在实际业务场景中,有些消息是群发的,所以一个人可能会在很短的时间里收到多条消息。

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.Message ID: @.***>

ericjing83 commented 5 months ago

请教下您,如何开启内核延迟发送?我怕命令敲错了

lesismal commented 5 months ago

conn.WriteMessage(websocket.TextMessage, message)

这一句本身只是调用nbio websocket conn的WriteMessage方法,你在什么协程里调用就是在什么协程里执行。

这些跟框架无关的基础知识需要自己多学习思考下

ericjing83 commented 5 months ago

Eric @.***

  好的,感谢大佬指点,

我查阅了下nbio websocket conn的WriteMessage方法,这个方法内部是异步执行的。 基于WriteMessage方法底层是异步的,业务逻辑的工作协程在调用WriteMessage方法后就会立刻执行后面的代码,海量消息并发推送时,当前goroutine不会被golang运行时挂起,这样不会产生性能损耗。 fasthttp在推送消息的时候会阻塞当前的工作协程,并发量高的时候,会导致业务逻辑的工作协程频繁被挂起或者恢复,会损失性能。所以我刚才问这个。我现在知道了,nbio比fasthttp更优化。

------------------ 原始邮件 ------------------ 发件人: "lesismal/nbio" @.>; 发送时间: 2024年1月25日(星期四) 上午10:09 @.>; @.**@.>; 主题: Re: [lesismal/nbio] 向大佬求助,如何用这个库写出生产者和消费者的代码结构? (Issue #385)

conn.WriteMessage(websocket.TextMessage, message)

这一句本身只是调用nbio websocket conn的WriteMessage方法,你在什么协程里调用就是在什么协程里执行。

这些跟框架无关的基础知识需要自己多学习思考下

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

lesismal commented 5 months ago

我查阅了下nbio websocket conn的WriteMessage方法,这个方法内部是异步执行的。 基于WriteMessage方法底层是异步的,业务逻辑的工作协程在调用WriteMessage方法后就会立刻执行后面的代码,海量消息并发推送时,当前goroutine不会被golang运行时挂起,这样不会产生性能损耗。

基于可能写阻塞的conn的默认是异步写的: https://github.com/lesismal/nbio/blob/master/nbhttp/websocket/upgrader.go#L31 可以设置 upgrader.BlockingModAsyncWrite = true/false 来开关基于可能写阻塞的conn的异步写

如果是没有设置IOMod、默认用IOModNonBlocking,这个BlockingModAsyncWrite就不生效,因为nbio.Conn本身就是非阻塞的、不会去使用额外的协程处理写

所以相当于是所有conn默认配置都是异步写。 这样做是为了避免有广播的场景时、在广播的for-loop里单个连接阻塞导致其他连接都等待。其他基于标准库conn的框架为了避免类似问题也基本都是一个异步写、多数是一个固定的写协程+chan例如melody,gws和nbio的基于可能写阻塞的连接是非固定协程处理异步写。

fasthttp在推送消息的时候会阻塞当前的工作协程,并发量高的时候,会导致业务逻辑的工作协程频繁被挂起或者恢复,会损失性能。所以我刚才问这个。我现在知道了,nbio比fasthttp更优化。

fasthttp也是gorilla一脉的,如果有广播场景怕单个连接阻塞导致其他连接等待,也是需要自己做类似的异步写封装。 只是优化方式不一样,各有各的好处,如果连接数量不是非常大、协程多点没关系,gorilla系也是足够好用的。

lesismal commented 5 months ago

又想到一个问题,再请教下大佬, 如果发送给同一个conn的两条websocket消息,时间间隔很短(比如小于10毫秒),nbio库会不会把这两条消息合并为一条消息发送,然后,在这两条消息之间增加一个分隔符? 如果可以这样,可能会降低大并发时候的消息发送次数,我对nbio的底层不了解,我不知道这样可不可以提升大并发时候nbio库的性能。在实际业务场景中,有些消息是群发的,所以一个人可能会在很短的时间里收到多条消息。

我在rpc框架里有做批次发送的优化,因为对于业务量较大的rpc服务,确实可能并发写比较多、合批带来较好的效果: https://github.com/lesismal/arpc/blob/master/client.go#L873 arpc的性能相关: https://colobu.com/2022/07/31/2022-rpc-frameworks-benchmarks/

但是对于websocket协议来讲,通常没这个必要:

  1. 多数是公网to c业务,单个连接的交互频率不会特别高、只有很小比例触发合批。合批代码比普通发送代码额外复杂一点、多一点消耗,因为比例小,所以整体算下来可能并不划算
  2. 如果是内网业务,例如rpc,通常有自己的协议头、“粘包”解析逻辑,没必要用websocket做transport再多一层“粘包”处理浪费性能
  3. 即使需要使用websocket并且优化批次写,应用层也可以自己额外加逻辑合并websocket message
  4. 开启TCP延迟发送SetNodelay(false) 也可以合批、效果也好,但是交互频率低的话可能响应及时性变差。而且1中说了,基于websocket多数是公网业务,FPS、MOBA、MMORPG之类的游戏需要单个连接高频,但这类游戏对响应性要求高、所以很多事用UDP类协议例如KCP甚至自己简单封装的UDP来获得更高性能,还有一些是VPN用websocket做隧道可能需要,但这也是可以用3的方式

可能还有其他的暂时没想那么全,大概上,websocket协议本身合批优化是不太需要的。我之前有想过在异步发送队列去做和arpc里合批类似的事情,也是出于以上几点原因所以没做

ericjing83 commented 5 months ago

请教大佬,

上面的示例代码里,这个函数: func newUpgrader() *websocket.Upgrader { u := websocket.NewUpgrader()

// 服务端主动发心跳检测浪费资源
// 让客户端发心跳、服务端只要读超时关闭就可以了,这样不白白浪费服务端定时器、更好
u.KeepaliveTime = 60 * time.Second
u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, message []byte) {
    hub.broadcast <- message
})
u.OnClose(func(c *websocket.Conn, err error) {
    hub.unregister <- c
})
return u

} 1.u.OnMessage这行,传入的匿名函数里的message切片,这个切片指向的heap空间,是不是在nbio的后台是存放在memorypool里的? 2.如果遇到海量并发的场景,由于任务在hub.run的channel里排队,等到排队排到了channel里的某个任务的时候,已经过了一段时间(消费者协程开始执行任务的时间点和接收某个新任务的时间点,也就是u.OnMessage触发的时间点,两者有时间差,并发量大的时候,时间差可能会比较大)。hub.run的channel里收到的任务是一个切片,虽然和之前的u.OnMessage里的message切片是两个切片,但是它们指向的内存空间(首尾地址)是相同的。当hub.run在执行某个任务的时候,实际上这个任务切片对应的数据已经在nbio的内存池里存放了一段时间了。这样的情况下,并发是安全的是伐?我的理解可能有偏差,不知道我的理解对不对。大佬可以指正。

lesismal commented 5 months ago

你说的是对的,抱歉这块我疏忽了。

可以设置下 ReleaseWebsocketPayload: false, 然后就不会自动放回到pool了。 等你处理完了、再用 engin.BodyAllocator.Free一下。 但通常的做法是收到后先对你们自己的协议进行解析,例如OnMessage里v, err := json.Unmarshal,然后把v传递给其他协程,这样就仍然可以让nbio自己自动释放、而且免得大buffer堆积

ericjing83 commented 5 months ago

大佬, 如果我这样写,可以释放内存吗? u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, message []byte) { hub.broadcast <- message message=nil }) 然后,我在hub.run里面,解析完channel的某个任务后,用这行代码: task := nil 把task切片设置为空 这样可以让nbio自动释放内存吗?

lesismal commented 5 months ago

可以先了解下sunc.Pool,它主要是提高复用率,pool.Put后才能被其他需要pool.Get的地方复用。只设置nil不Put回去无法提高复用率,nil后取决于gc。go runtime为了满足内存效率不一定立即释放堆对象内存,这样能在需要时runtime立刻有可用内存不需要每次都跟系统申请避免低效。 如果你的业务量不是特别大设置nil或者不使用sync.Pool也没有内存压力,那就无所谓了。一些特殊场景甚至还需要更多定制

ericjing83 commented 5 months ago

感谢大佬的解答

我大致看了下mempool的实现,感觉nbio的mempool分配和回收内存的效率比golang默认的gc效率要更高,特别是在大并发海量请求的时候。heap里的内存会在预分配空间里反复使用,相比海量的切片(每秒100万个切片)被golang的垃圾回收器默认的机制延时回收,性能更好。而且golang默认的gc频繁分配回收内存会产生碎片,这会进一步降低性能。 我决定在hub.run里面,用大佬推荐的engin.BodyAllocator.Free进行回收。我可能要在生产环境下把mempool预分配得更大一点。

ericjing83 commented 5 months ago

再请教下大佬, 按照上面的生产者消费者代码,想释放mempool的内存的时候,engin.BodyAllocator.Free这个方法,我是在u.OnMessage的匿名函数里用?还是在hub.run里用?

lesismal commented 5 months ago

在你最终使用完、不再需要它的地方用。比如你hub.run里又传递给了其他地方、那就在其他用完的地方调用

ericjing83 commented 5 months ago

大佬请教下, 如果服务器接收海量并发一段时间(短时间的使用高峰,比如说30秒左右),导致生产者非常快,但是消费者协程需要处理业务逻辑会比较慢,生产者在等消费者。消费者协程的channel里保存的任务是u.OnMessage里面的message的浅拷贝,channel预分配得足够大,空间不会不够。 现在得问题是, 1.这种情况下,短时间里(比如说30秒里)会有海量的还没来得及回收的message在nbio的内存池里,这种情况下nbio的内存池空间够吗,它会自动扩容吗? 2.nbio的内存池如果实在不能扩容,这种情况下我想从nbio的环境里得到一个状态机,这个状态机可以通知u.OnMessage里面的业务逻辑代码,空间不够了,然后我再写代码想办法把这些切片保存到别的地方。

lesismal commented 5 months ago

默认的内存池没有对内存使用上限做限制,只是提高复用率,如果怕业务OOM,你们自己定制频率限制之类的

lesismal commented 5 months ago

量大的场景,大概考虑几点:

  1. 提高程序自己的消费者性能。如果涉及广播,多几个hub,每个连接按照userid之类的hash到不同的hub上去处理,这样提高消费者并发度
  2. 提高下游消费者性能。例如下游的数据库、redis、其他rpc服务等,如果成本允许,扩容、增加连接数参数等
  3. 如果1中硬件规格有限、2中下游基础设施硬件规格有限,无法提高消费速度并且需要限流,可以考虑增加消息队列削峰处理
  4. 还应该对消息body size等进行合理的限制配置,nbhttp.Config{ ReadLimit: aLittleBiggerThanYourMaxMessageSize }
  5. 实际的内存用量、生产消费速度,不要猜,多跑点模拟真实情况的测试,以实测的数据作为参考,如果需要、配合pprof看怎么优化
ericjing83 commented 5 months ago

好的,感谢大佬指点。 我再多测试下。底层的优化方案定了,业务逻辑的代码比较容易写。如果先写业务逻辑代码,底层的方案到时候和原来的不同,可能会推翻所有的业务逻辑代码。

github-actions[bot] commented 4 months ago

This issue is stale because it has been open for 30 days with no activity.

github-actions[bot] commented 3 months ago

This issue was closed because it has been inactive for 14 days since being marked as stale.