lesismal / nbio

Pure Go 1000k+ connections solution, support tls/http1.x/websocket and basically compatible with net/http, with high-performance and low memory cost, non-blocking, event-driven, easy-to-use.
MIT License
2.22k stars 155 forks source link

How to read messages from *websocket.Connection? #152

Closed kant777 closed 2 years ago

kant777 commented 2 years ago

Hi,

I am looking at this example https://github.com/lesismal/nbio-examples/blob/master/websocket_1m/server/server.go and I have few questions

  1. I see that is 1m websocket server you are using mux but in the the readme I see this
g := nbio.NewGopher(nbio.Config{
    Network: "tcp",
    Addrs:   []string{"localhost:8888"},
})

which one to use for large number of websocket connections like say 1m?

  1. why does connection object doesn't have onMessage? why only upgrader has? What is the way to read messages without allocating more buffers in the application? especially my goal is to scale for 1m webscokets?

  2. I want to send a stream of notifications over 1m socket connections so what is the best way to approach that using this library? since this is the only library I found that can use epoll as well as reuse buffers for low memory footprint. please suggest

lesismal commented 2 years ago

which one to use for large number of websocket connections like say 1m?

lesismal/nbio handles tcp which is 4th layer, lesismal/nbio/nbhttp and lesismal/nbio/nbhttp/websocket handle http and websocket which are 7th layer. follow the websocket examples.

why does connection object doesn't have onMessage? why only upgrader has?

to guard concurrent order. because the upgrader write back the http upgrade response to the client, and when the client may send websocket message soon after it recieved the response. if we provide a wsconn.OnMessage which would be called outside the upgrader.Upgrade, the conn may recieve a websocket message before we have called conn.OnMessage in some edge cases. so, the conn doesn't have OnMessage but only the upgrader has.

What is the way to read messages without allocating more buffers in the application? especially my goal is to scale for 1m webscokets?

unlike std that you need to use a goroutine for each std conn for reading which is the reason why std costs so much memory and leads to stw when there are too many conns, nbio is async framework which aims to save goroutines then save memory and solve stw problem.

so you don't need to read, the framework will pass the message buffer to your OnMessage handler. to reuse and save buffers, nbio use the same buffer pool for poller read/write, tls read/write, http read/write and websocket read/write. if you are sure that you will not use the buffer outside the OnMessage handler, you should set: ReleaseWebsocketPayload: true, then nbio will put the message buffer back into the pool and reuse it.

I want to send a stream of notifications over 1m socket connections so what is the best way to approach that using this library? since this is the only library I found that can use epoll as well as reuse buffers for low memory footprint. please suggest

I don't know what details you want to know, but generally, you just need to use nbio's websocket server, you can keep ws conns in a map, use mutex, or use goroutine+chan, then broadcast your notifications to all the conns when you need.

kant777 commented 2 years ago

Perfect! Thanks a lot for that response. I think I understand the read side of things now. Can you please, briefly elaborate on buffer allocation on write side of things? for example, Imagine I have one million websocket clients that are connected simultaneously on a single nbio powered websocket server and I need to a stream of notifications(json messages) to them forever (until the client gets disconnected). should I use conn.write or conn.writeMessage or conn.WriteFrame or something else for my use case?

BTW, how many buffers are allocated in the buffer pool?

lesismal commented 2 years ago

Can you please, briefly elaborate on buffer allocation on write side of things?

To reuse buffers as much as possible, nbio use the same buffer pool for both read and write, for different layers from tcp to tls and http/websocket.

When you call wsconn.WriteMessage/WriteFrame, your buffer will be packed up to websocket frame/buffer, and then it will be directly written to the socket fd, for most of the time, the socket fd is healthy and there's no need to cache the buffer. When the socket fd's write buffer in kernel is full, the write will fail and the nbio.Conn will cache the buffer(using the same pool) and add write-event to epoll and wait it writable then flush the buffer.

Users don't need to manage the buffers' allocation for nbio-framework, it handles the allocation during these steps itself. But users should consider the buffer you passed to nbio, you can reuse your application's buffers if you need and want. For more details of nbio's buffer allocation implementation, please read the fucking code :smile:

BTW, how many buffers are allocated in the buffer pool? For the std solution, its memory cost is usually related to the num of conns, because we need at least 1 goroutine to handle 1 conn, which costs a lot of the goroutine's stack, some heap allocations, and gc... For nbio, the memory cost is mostly related to the qps of your service, because we don't need to handle each goroutine with at least 1 goroutine, but we can use a goroutine pool with limited size, the pool size, which could be controlled by yourself, it could extremely reduce the num of the goroutines in a high-online-num service.

For example, a service with 1m online conns and 50k qps, the read/write buffers we need are about: online qps buffer num
std 1m 50k 1m
nbio 1m 50k 50k

That's an estimated comparison, without calculating all the processes for different solutions. For the real hardware cost and performance, you need to run a test, or run the service in production env, and get the result.

BTW, if the online num is not so large and the boss does not care about hardware's price, nbio will not be more efficient than std.

kant777 commented 2 years ago

Thanks a lot for that description. just tried the below code as per the example https://github.com/lesismal/nbio-examples/blob/master/websocket_1m/server/server.go

    mux := &http.ServeMux{}
    mux.HandleFunc("/", onWebsocket)

    svr := nbhttp.NewServer(nbhttp.Config{
        Network:                 "tcp",
        Addrs:                   []string{"localhost:9090"},
        MaxLoad:                 1000000,
        ReleaseWebsocketPayload: true,
        Handler:                 mux,
    })
    err := svr.Start()
    if err != nil {
        log.Fatal().Msgf("nbio.Start failed: %v", err)
        return
    }
    defer svr.Stop()

I get the following error there is nothing else running on my port 9090

2022/02/10 02:58:52.222 [ERR] Accept failed: accept tcp 127.0.0.1:9090: use of closed network connection, exit...

its not just about the hardware cost for me although it is important too. Latency is extremely important for my app like every millisecond counts. I initially tried to spawn 10K connections using gorilla websocket library and my pprof should me the memory consumption is about ~8GB and I was like when can I reach million this way?! It looked too inefficient for the scale of the problem I am dealing with and I want to deliver notifications under a millisecond response time.

lesismal commented 2 years ago

svr.Start() is not a blocking function, if your goroutine is exit, the svr.Stop() will be called when defer. so, check whether your goroutine is exit first, or provide me the full code.

lesismal commented 2 years ago

if you have 1m conns for 1 server, you can use a buckets array to keep the conns by hash, then use several goroutines to broadcast to the clients in each bucket.

lesismal commented 2 years ago

simple echo benchmark for 50k online conns on a low hardware

env

ubuntu@ubuntu:~/dev/gopath/src/github.com/lesismal/nbio_examples/websocket_1m$ go version
go version go1.17 linux/amd64
ubuntu@ubuntu:~/dev/gopath/src/github.com/lesismal/nbio_examples/websocket_1m$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 20.04 LTS
Release:    20.04
Codename:   focal
ubuntu@ubuntu:~/dev/gopath/src/github.com/lesismal/nbio_examples/websocket_1m$ cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c
      2  AMD Ryzen 7 5800H with Radeon Graphics
ubuntu@ubuntu:~/dev/gopath/src/github.com/lesismal/nbio_examples/websocket_1m$ free -h
              total        used        free      shared  buff/cache   available
Mem:          3.8Gi       738Mi       2.5Gi       0.0Ki       597Mi       2.9Gi
Swap:         4.0Gi       274Mi       3.7Gi

server

ubuntu@ubuntu:~/dev/gopath/src/github.com/lesismal/nbio_examples/websocket_1m$ go run ./server
2022/02/10 16:05:05.643 [INF] Gopher[NB] start
2022/02/10 16:05:05.644 [INF] Serve  NonTLS On: [localhost:28001]
2022/02/10 16:05:05.644 [INF] Serve  NonTLS On: [localhost:28002]
......
running for 13 seconds, NumGoroutine: 88, qps: 0, total: 0
running for 14 seconds, NumGoroutine: 56, qps: 0, total: 0
running for 15 seconds, NumGoroutine: 56, qps: 143, total: 143
running for 16 seconds, NumGoroutine: 467, qps: 41023, total: 41166
running for 17 seconds, NumGoroutine: 217, qps: 51673, total: 92839
running for 18 seconds, NumGoroutine: 343, qps: 45627, total: 138466
running for 19 seconds, NumGoroutine: 557, qps: 57785, total: 196251
running for 20 seconds, NumGoroutine: 96, qps: 56712, total: 252963
running for 21 seconds, NumGoroutine: 557, qps: 56250, total: 309213
running for 22 seconds, NumGoroutine: 87, qps: 56258, total: 365471
......

client

ubuntu@ubuntu:~/dev/gopath/src/github.com/lesismal/nbio_examples/websocket_1m$ go run ./client/ -c=50000 -g=500
running for 1 seconds, online: 11742, NumGoroutine: 659, success: 0, totalSuccess: 0, failed: 0, totalFailed: 0
running for 2 seconds, online: 23638, NumGoroutine: 937, success: 0, totalSuccess: 0, failed: 0, totalFailed: 0
running for 3 seconds, online: 36450, NumGoroutine: 715, success: 0, totalSuccess: 0, failed: 0, totalFailed: 0
running for 4 seconds, online: 48398, NumGoroutine: 675, success: 131, totalSuccess: 131, failed: 0, totalFailed: 0
running for 5 seconds, online: 50000, NumGoroutine: 501, success: 45287, totalSuccess: 45418, failed: 0, totalFailed: 0
running for 6 seconds, online: 50000, NumGoroutine: 501, success: 52099, totalSuccess: 97517, failed: 0, totalFailed: 0
running for 7 seconds, online: 50000, NumGoroutine: 501, success: 59833, totalSuccess: 157350, failed: 0, totalFailed: 0
running for 8 seconds, online: 50000, NumGoroutine: 501, success: 57104, totalSuccess: 214454, failed: 0, totalFailed: 0
running for 9 seconds, online: 50000, NumGoroutine: 501, success: 54893, totalSuccess: 269347, failed: 0, totalFailed: 0
running for 10 seconds, online: 50000, NumGoroutine: 501, success: 49928, totalSuccess: 319275, failed: 0, totalFailed: 0
running for 11 seconds, online: 50000, NumGoroutine: 501, success: 58379, totalSuccess: 377654, failed: 0, totalFailed: 0
......

server cost

cpu: under 100%(load < 1) mem: under 200M

ubuntu@ubuntu:~/dev/gopath/src/github.com/lesismal/nbio$ ps -aux  | grep ./server
ubuntu     44787 88.5  4.4 2114248 176748 pts/0  Rl+  16:06   1:15 /tmp/go-build2751260735/b001/exe/server
ubuntu@ubuntu:~/dev/gopath/src/github.com/lesismal/nbio$ top
top - 16:08:53 up  1:40,  3 users,  load average: 5.43, 2.77, 1.52
Tasks: 249 total,   5 running, 243 sleeping,   0 stopped,   1 zombie
%Cpu(s): 18.8 us, 40.6 sy,  0.0 ni, 12.5 id,  0.0 wa,  0.0 hi, 28.1 si,  0.0 st
MiB Mem :   3901.0 total,    872.6 free,   2311.7 used,    716.7 buff/cache
MiB Swap:   4096.0 total,   3823.7 free,    272.2 used.   1308.4 avail Mem 

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND                                                                                                                                   
  44787 ubuntu    20   0 2114248 176748   5628 R  70.6   4.4   1:45.72 server                                                                                                                                    
  44838 ubuntu    20   0 2641512   1.1g   5420 R  47.1  27.7   1:21.30 client   
kant777 commented 2 years ago

@lesismal Thanks a ton. I realized you have a ticker to block the thread(would have been great if there was an additional function to start the server in a blocking way but all good for now)

I got some more basic questions. When do I invoke .onOpen .onClose methods? I had onMessage and onOpen in the upgrader and I don't see onOpen getting called at all. not sure if I am doing something wrong. for production scenarios do you recommend onOpen and onClose or should I just leave that to the library?

Regarding writing to one million socket I have the following structure

type Hub struct {
    // Registered clients.
    clients map[*Client]bool

    // messages that need to be serialized to json and broadcast to clients
    broadcast chan map[string]interface{}

    // Register requests from the clients.
    register chan *Client

    // Unregister requests from clients.
    unregister chan *Client

}
type Client struct {
    hub *Hub // the above structure
    conn *websocket.Conn 
    upgrader *websocket.Upgrader
    send chan []byte // outgoing messages
    Req FeedRequest 
}

and I maintain a map[*Client]bool for connected clients. so If I have one million concurrent concurrent connections I will have one million keys in the map. I will iterate through a channel of messages and for each message I iterate through all the million clients, spawn a go routine, serialize and send to channel. so the pseudo code would look like this

for {
  select {
  case message := <-h.broadcast:
        for k,v : = range clients {
               go func() {
                  jsonInBytes := serializeToJson(v)
                  client.send <- jsonInBytes
               }()
         }
  }

once the messages are sent to client.send channel I do websocket.Conn.WriteMessage. I understanding creating a go routine for one million client would require at least 4KB * 1million = 4GB so I was planning to create a pool of go routines and use it from a pool however given what you said I would like to know more about your buckets array connection by hash..so what hash am I storing storing and how do I bucket connections?

Please suggest

lesismal commented 2 years ago

maybe you can do it like this, I just spend about a quarter writing this and haven't run a test, you can try it and customise the code yourself:

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/signal"
    "runtime"
    "time"

    "github.com/lesismal/nbio"
    "github.com/lesismal/nbio/nbhttp"
    "github.com/lesismal/nbio/nbhttp/websocket"
)

var (
    clientMgr = NewClientMgr(runtime.NumCPU())

    keepaliveTime = time.Second * 30
)

type ClientMgr struct {
    hubs []*Hub
}

func (mgr *ClientMgr) Start() {
    for _, h := range mgr.hubs {
        h.Start()
    }
}

func (mgr *ClientMgr) Stop() {
    for _, h := range mgr.hubs {
        h.Stop()
    }
}

func (mgr *ClientMgr) Register(c *websocket.Conn) {
    mgr.Bucket(c).Register(c)
}

func (mgr *ClientMgr) Unregister(c *websocket.Conn, err error) {
    mgr.Bucket(c).Unregister(c, err)
}

func (mgr *ClientMgr) Broadcast(data []byte) {
    for _, h := range mgr.hubs {
        h.Broadcast(data)
    }
}

func (mgr *ClientMgr) Bucket(c *websocket.Conn) *Hub {
    nbc, _ := c.Conn.(*nbio.Conn)
    idx := uint64(nbc.Hash()) % uint64((len(mgr.hubs)))
    return mgr.hubs[idx]
}

func NewClientMgr(hubSize int) *ClientMgr {
    mgr := &ClientMgr{}

    for i := 0; i < hubSize; i++ {
        mgr.hubs = append(mgr.hubs, NewHub())
    }

    return mgr
}

type Hub struct {
    clients      map[*websocket.Conn]struct{}
    chOperations chan func()
}

func (h *Hub) Start() {
    for i := 0; i < runtime.NumCPU(); i++ {
        go func() {
            for op := range h.chOperations {
                func() {
                    defer func() {
                        err := recover()
                        if err != nil {
                            // some log
                            log.Printf("%v", err)
                        }
                    }()
                    op()
                }()
            }
        }()
    }
}

func (h *Hub) Stop() {
    close(h.chOperations)
}

func (h *Hub) Register(c *websocket.Conn) {
    h.chOperations <- func() {
        h.clients[c] = struct{}{}
    }
}

func (h *Hub) Unregister(c *websocket.Conn, err error) {
    h.chOperations <- func() {
        delete(h.clients, c)
        // some log
    }
}

func (h *Hub) Broadcast(data []byte) {
    h.chOperations <- func() {
        for c := range h.clients {
            c.WriteMessage(websocket.TextMessage, data)
        }
    }
}

func NewHub() *Hub {
    return &Hub{
        clients:      map[*websocket.Conn]struct{}{},
        chOperations: make(chan func(), runtime.NumCPU()),
    }
}

func newUpgrader() *websocket.Upgrader {
    u := websocket.NewUpgrader()

    u.OnOpen(func(c *websocket.Conn) {
        clientMgr.Register(c)
    })

    u.OnClose(func(c *websocket.Conn, err error) {
        clientMgr.Unregister(c, err)
    })

    u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
        // logic if you need
        // ...

        // update keepalive deadline
        c.SetReadDeadline(time.Now().Add(keepaliveTime))
    })

    return u
}

func onWebsocket(w http.ResponseWriter, r *http.Request) {
    // if err := authenticate(...); err != nil {
    //  some log
    //  return
    // }

    upgrader := newUpgrader()
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        // some log
        return
    }

    // usually we should set a keepalive deadline, and update the deadline in OnMessage handler
    wsConn := conn.(*websocket.Conn)
    wsConn.SetReadDeadline(time.Now().Add(keepaliveTime))
}

func main() {
    clientMgr.Start()
    defer clientMgr.Stop()

    mux := &http.ServeMux{}
    mux.HandleFunc("/ws", onWebsocket)

    svr := nbhttp.NewServer(nbhttp.Config{
        Network: "tcp",
        Addrs:   []string{"localhost:9090"},
        Handler: mux,
    })

    err := svr.Start()
    if err != nil {
        fmt.Printf("nbio.Start failed: %v\n", err)
        return
    }

    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)
    <-interrupt
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()
    svr.Shutdown(ctx)
}

Here you can do some optimization for your code:

// struct{}{} is better than bool here
clients map[*Client]bool
for {
    select {
    case message := <-h.broadcast:
          for k,v : = range clients {
                 // do not use goroutine to send each message to each client
                 go func() {

                    // serialize once outside the loop and send the buffer to all the clients, rather than serialize N times
                    jsonInBytes := serializeToJson(v) 

                    // nbio's websocket.Conn is non-blocking, just call wsconn.WriteMessage, no need to use goroutines for its reading/writing
                    // don't do that as std solutions did, else we cann't solve 1000k problem in go.
                    client.send <- jsonInBytes 
                 }()
           }
    }
}

Although nbio's websocket.Conn.WriteMessage is non-blocking, it will cost a little-longer time to loop for 1m conns, 1m is a big number. So I use buckets of chan+goroutine in my code, and then we can broadcast concurrently using several goroutines to reduce the total loop time for each message.

lesismal commented 2 years ago

I posted some messages but deleted them because I need to edit them, now the code should be right.

kant777 commented 2 years ago

@lesismal Thanks again, This is really interesting, and if I understand this correctly and please correct me if I am wrong,

you are having a channel of function pointers where each function would serialize to json(in my case) and write to the socket. The number of channel such channels = the number of CPU each message is wrapped under a function and the function is sent to all channels (the number of CPU). This is clever, so instead of sending the raw message to all 4 channel you wrapped it under a function so that you can just pass the address than replicating the message across channel, is that correct? Finally, you want to send the same function pointer to all channels to achieve parallelism! The missing link for me from your code is when and how the broadcast gets called? in my case I have a channel of messages that are enqueued from an external message queue and these are the messages that I need to broadcast to 1M file descriptors.

The other question I have here is for chan func() would you do buffered or unbuffered? I usually do buffered of size 1024 but please let me know your thoughts here?

lesismal commented 2 years ago

@lesismal Thanks again, This is really interesting, and if I understand this correctly and please correct me if I am wrong,

  1. you are having a channel of function pointers where each function would serialize to json(in my case) and write to the socket.
  2. The number of channel such channels = the number of CPU
  3. each message is wrapped under a function and the function is sent to all channels (the number of CPU). This is clever, so instead of sending the raw message to all 4 channel you wrapped it under a function so that you can just pass the address than replicating the message across channel, is that correct? Finally, you want to send the same function pointer to all channels to achieve parallelism!

The missing link for me from your code is when and how the broadcast gets called? in my case I have a channel of messages that are enqueued from an external message queue and these are the messages that I need to broadcast to 1M file descriptors.

The other question I have here is for chan func() would you do buffered or unbuffered? I usually do buffered of size 1024 but please let me know your thoughts here?

You should adjust the size for hub bucket size, cap of chan, or others. My code is just an example, you decide it for your production env.

lesismal commented 2 years ago

The missing link for me from your code is when and how the broadcast gets called? in my case I have a channel of messages that are enqueued from an external message queue and these are the messages that I need to broadcast to 1M file descriptors.

call clientMgr.Broadcast anywhere you need.

lesismal commented 2 years ago

The other question I have here is for chan func() would you do buffered or unbuffered? I usually do buffered of size 1024 but please let me know your thoughts here?

1024 is fine.

lesismal commented 2 years ago

each message is wrapped under a function and the function is sent to all channels (the number of CPU). This is clever, so instead of sending the raw message to all 4 channel you wrapped it under a function so that you can just pass the address than replicating the message across channel, is that correct? Finally, you want to send the same function pointer to all channels to achieve parallelism!

select case for multi chan cases is a little slower than just one case, so I use chan func() to reduce the num of chan cases, but it doesn't matter too much for performance here. But the serialization matters much different between once and num of conns.

lesismal commented 2 years ago

you are having a channel of function pointers where each function would serialize to json(in my case) and write to the socket. The number of channel such channels = the number of CPU

That's incorrect, for Broadcast, we do not serialize json in the func, but serialize first, then Broadcast buffer to conns. Whatever, avoid the waste of non-needed serialization.

number of CPU is an example, you decide the size.

kant777 commented 2 years ago

select case for multi chan cases is a little slower than just one case, so I use chan func() to reduce the num of chan cases, but it doesn't matter too much for performance here. But the serialization matters much different between once and num of conns.

@lesismal I thought the whole point of using chan func() so we don't replicated the same message across all the channels instead replicate the function pointers across channels? no? I am trying to understand whether we are replication each message to all channels using this approach or not? Thanks

lesismal commented 2 years ago

broadcast chan map[string]interface{}

A map or *struct, does not matter too much about replication waste. I think you have some misunderstanding about pointer/reference/value like in cpp. You should google or refer to some books for more knowledge rather than asking me here because I'm not a native English speaker and I'm not able to explain that too much to make you clear.

kant777 commented 2 years ago

@lesismal No worries, your english is great! I simplified my question

message := "some_1KB_message"

case 1

c1 := make(chan string, 1)
c2 := make(chan string ,1)

c1 <- message
c2 <- message

fmt.Println(unsafe.Sizeof(<-c1))
fmt.Println(unsafe.Sizeof(<-c2))

vs

case 2

c3 := make(chan func() string,  1)
c4 := make(chan func() string,  1)

c3 <- func() string {return message}
c4 <- func() string {return message}

fmt.Println(unsafe.Sizeof(<-c3))
fmt.Println(unsafe.Sizeof(<-c4))

I think you went with case2 to minimize the memory size while have channel per CPU to broadcast messages

kant777 commented 2 years ago

is onOpen guaranteed to get called before onMessage ? I tried this but I don't see onOpen getting called before onMessage..why?

lesismal commented 2 years ago

I think you went with case2 to minimize the memory size while have channel per CPU to broadcast messages

Passing a string does not cost much more than passing a func(), string or []byte are also a small struct with buffer, the buffer won't be copied when the string or []byte are passed to a chan

Why I use a chan func() is just to use only one chan instead of your 3 chans, but 3 chans is also fine, it doesn't matter much about performance in this case.

lesismal commented 2 years ago

is onOpen guaranteed to get called before onMessage ?

yes, guaranteed.

I tried this but I don't see onOpen getting called before onMessage..why?

please provide the full code that can reproduce your problem, I'll check it.

kant777 commented 2 years ago

onOpen works, thanks. I will be trying your approach for 1000K problem. will report back

kant777 commented 2 years ago

@lesismal Looks like there is a race issue here

func (h *Hub) Register(c *websocket.Conn) {
    h.chOperations <- func() {
        h.clients[c] = struct{}{}
    }
}

The error is this

fatal error: concurrent map writes

I can add a lock but I think that would affect the performance

lesismal commented 2 years ago

you are using the old code before I edited it, see the latest code, here is a ClientMgr

lesismal commented 2 years ago

You should also think more about the code yourself, instead of asking all questions including easy details(such as "when and how the broadcast gets called?"), otherwise, it will be difficult to improve your own skills :joy:.

kant777 commented 2 years ago

@lesismal I am using latest code. if you look at your latest code it has Register functions in both ClientMgr and Hub. I think you are also quickly drawing conclusions on someone's skills that you don't even understand and why such questions are being asked. But regardless, that commentary is not appreciated.

lesismal commented 2 years ago

if you have 1m conns for 1 server, you can use a buckets array to keep the conns by hash, then use several goroutines to broadcast to the clients in each bucket.

see this, for your 1m conns.

my example code is less than 200 lines in total, and if you really spend enough time reading and thinking about it, you should not have a misunderstanding.

type ClientMgr struct {
    hubs []*Hub
}

@lesismal Looks like there is a race issue here

func (h *Hub) Register(c *websocket.Conn) {
  h.chOperations <- func() {
      h.clients[c] = struct{}{}
  }
}

The error is this

fatal error: concurrent map writes

I can add a lock but I think that would affect the performance

And I think you didn't try a little further when you ask this.

I used to meet many young programers who will fastly turn to other guys to get help before they try to find and solve the problem themselves, also like this: is onOpen guaranteed to get called before onMessage ? I tried this but I don't see onOpen getting called before onMessage..why?

Asking other people questions quickly without serious thought, that's wasting both your own growth and other people's time.

I don't want to blame, because I was like this for a while when I was young or when I started programming. I often rely on others' help and made my own progress very slowly until I got rid of this habit and thought about it myself first when I encountered a problem

lesismal commented 2 years ago

But regardless, that commentary is not appreciated.

sorry about that, but faithful words offend the ear but are good for the conduct., it depends on how you think about it

kant777 commented 2 years ago

@lesismal you can't throw a bunch of code and expect people to read and figure it out. I have a lot of code to read and write already and I expect certain libraries to do certain things. And if your library cannot do that, all good. no big deal. If you think people need to read every line of code that their application depends on they will never deliver anything!!!

I will not accept any criticism from you, if you don't want to answer any questions no matter how naive you think they are, you can simply choose to ignore and I will refrain from asking any further. you are not the only one out there. As a matter of fact, you are not the one at all. I can't read your "fucking mind" to worry about if you think it is naive or not for you. you are clearly making certain judgments which are totally false. you are not a master and don't act like one.

your code has bunch of race issues and your example don't reflect anything useful not they are properly documented. I will never introduce or recommend your library to anyone as I am going with the other library at this point where there are all and in some cases even better optimizations than your "fucking code" (as you say it). it's too unfortunate that I landed on yours, I should never looked into your library in the first place

Your words are not faithful, it does nothing other than discourage someone from using it and write a response like above!! so put your bias culture into garbage.

lesismal commented 2 years ago

Good luck boy!

But btw, to be honest, eh...
Till now, other frameworks can not solve golang's 1000k problem in a single process, neither eranyanay/1m-go-websockets nor gobwas/ws. You can search me int the issue list of gobwas/ws or gobwas/ws-examples. I have also opened issues for eranyanay/1m-go-websockets, but it seems the author disable the project's issue functional.

to save your time, you can directly refer to gobwas/ws-example's issue 18, where I have provided the reasons for the problem of these two libs, and have provided the code that can reproduce the problem.

Here are also some other poller frameworks such as evio, gev, gnet. But they haven't supported tls/http/websocket(gev support simple websocket only, without tls).

So, maybe, my repo is the only one till now, as you have said in your first floor: "since this is the only library I found that can use epoll as well as reuse buffers for low memory footprint"

I write code seriously, wish you are serious too.

Good luck again!

BruceDone commented 2 years ago

@lesismal you can't throw a bunch of code and expect people to read and figure it out. I have a lot of code to read and write already and I expect certain libraries to do certain things. And if your library cannot do that, all good. no big deal. If you think people need to read every line of code that their application depends on they will never deliver anything!!!

I will not accept any criticism from you, if you don't want to answer any questions no matter how naive you think they are, you can simply choose to ignore and I will refrain from asking any further. you are not the only one out there. As a matter of fact, you are not the one at all. I can't read your "fucking mind" to worry about if you think it is naive or not for you. you are clearly making certain judgments which are totally false. you are not a master and don't act like one.

your code has bunch of race issues and your example don't reflect anything useful not they are properly documented. I will never introduce or recommend your library to anyone as I am going with the other library at this point where there are all and in some cases even better optimizations than your "fucking code" (as you say it). it's too unfortunate that I landed on yours, I should never looked into your library in the first place

Your words are not faithful, it does nothing other than discourage someone from using it and write a response like above!! so put your bias culture into garbage.

请不要把你的没礼貌当作无理的要求,你出现的问题是你自己本人基础知识欠缺导致的,你应该补充基础知识,而不是期望满足一个键盘的售后会教会你计算组成原理的不切实际的幻想,shame on you @kant777

tehsphinx commented 10 months ago

@lesismal Thank you for this very helpful issue! Just wanted to let you know that there are people out there that appreciate you helping out, going far beyond what you'd need to do. Firstly taking the time to write this library and thereby providing the community a tool to get started quickly with servers that can handle 1mil+ connections. Secondly going beyond and above on trying to help answer questions on how to use the library. Thirdly (since I came here reading your issue on gobwas/ws) your effort to help and improve also other libraries by pointing out the issues they have, making the overall ecosystem better.

Thanks again!


Edit: To be a bit more specific on what this helped me with: I was wondering on how to broadcast messages to selected clients. If it would be correct to just capture incoming connections in the OnOpen handler or if that would create problems. The answer in your sample code was very clear.

lesismal commented 10 months ago

@lesismal Thank you for this very helpful issue! Just wanted to let you know that there are people out there that appreciate you helping out, going far beyond what you'd need to do. Firstly taking the time to write this library and thereby providing the community a tool to get started quickly with servers that can handle 1mil+ connections. Secondly going beyond and above on trying to help answer questions on how to use the library. Thirdly (since I came here reading your issue on gobwas/ws) your effort to help and improve also other libraries by pointing out the issues they have, making the overall ecosystem better.

Thanks again!

Edit: To be a bit more specific on what this helped me with: I was wondering on how to broadcast messages to selected clients. If it would be correct to just capture incoming connections in the OnOpen handler or if that would create problems. The answer in your sample code was very clear.

@tehsphinx Thanks Bro! I appreciate your feedback! Hope you have a good time with nbio!