kataras / neffos

A modern, fast and scalable websocket framework with elegant API written in Go
http://bit.ly/neffos-wiki
MIT License
571 stars 47 forks source link

[BUG] Broadcast deadlocks when concurrency #59

Open AlanTianx opened 2 years ago

AlanTianx commented 2 years ago

Describe the bug I found in the actual application that the Broadcast() method will cause a deadlock

To Reproduce Steps to reproduce the behavior:

  1. When nsConn is disconnected, it will cause Server.Broadcast() internal blocking'' when broadcasting a message on the websocket.OnNamespaceDisconnect or websocket.OnRoomLeft event
  2. code
    
    package main

import ( "fmt" gorilla "github.com/gorilla/websocket" "github.com/kataras/iris/v12" "github.com/kataras/iris/v12/websocket" "github.com/kataras/neffos" "net/http" "time" )

func main() { namespace, room, event := "test", "ttt", "ch" chatServer := websocket.New( websocket.GorillaUpgrader(gorilla.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, // todo 允许所有的CORS 跨域请求,正式环境可以关闭 CheckOrigin: func(r http.Request) bool { return true }, }), websocket.WithTimeout{ WriteTimeout: time.Second 60, ReadTimeout: time.Second 60, Namespaces: websocket.Namespaces{ namespace: websocket.Events{ websocket.OnNamespaceConnected: func(nsConn websocket.NSConn, msg websocket.Message) error { fmt.Println("OnNamespaceConnected", nsConn.Conn.ID()) nsConn.JoinRoom(nil, room) return nil }, websocket.OnNamespaceDisconnect: func(nsConn *websocket.NSConn, msg websocket.Message) error { fmt.Println("OnNamespaceDisconnect", nsConn.Conn.ID()) // Todo The broadcast here will cause blocking in server.start() //nsConn.Conn.Server().Broadcast(nsConn, neffos.Message{ // Body: []byte("我离开了room" + nsConn.Conn.ID()), // Namespace: namespace, // Room: room, // To: "", // Event: event, //})

                    // Todo Add a certain delay and everything is normal
                    time.AfterFunc(time.Millisecond*50, func() {
                        nsConn.Conn.Server().Broadcast(nsConn, neffos.Message{
                            Body:      []byte("我离开了room" + nsConn.Conn.ID()),
                            Namespace: namespace,
                            Room:      room,
                            To:        "",
                            Event:     event,
                        })
                    })

                    return nil
                },
                websocket.OnRoomJoined: func(nsConn *websocket.NSConn, msg websocket.Message) error {
                    fmt.Println("OnRoomJoined", nsConn.Conn.ID())
                    nsConn.Emit(event, []byte("我是单独消息"))
                    nsConn.Conn.Server().Broadcast(nil, neffos.Message{
                        Body:      []byte("我加入了room" + nsConn.Conn.ID()),
                        Namespace: namespace,
                        Room:      room,
                        To:        "",
                        Event:     event,
                    })

                    return nil
                },
                websocket.OnRoomLeft: func(nsConn *websocket.NSConn, msg websocket.Message) error {
                    fmt.Println("OnRoomLeft", nsConn.Conn.ID(), nsConn.Conn.IsClosed())
                    // Todo The broadcast here will cause blocking in `server.start()`
                    //nsConn.Conn.Server().Broadcast(nsConn, neffos.Message{
                    //  Body:      []byte("我离开了room" + nsConn.Conn.ID()),
                    //  Namespace: namespace,
                    //  Room:      room,
                    //  To:        "",
                    //  Event:     event,
                    //})

                    // Todo Add a certain delay and everything is normal
                    time.AfterFunc(time.Millisecond*50, func() {
                        nsConn.Conn.Server().Broadcast(nsConn, neffos.Message{
                            Body:      []byte("我离开了room" + nsConn.Conn.ID()),
                            Namespace: namespace,
                            Room:      room,
                            To:        "",
                            Event:     event,
                        })
                    })
                    return nil
                },
            },
        },
    },
)

app := iris.New()
app.Get("/e", websocket.Handler(chatServer, func(ctx iris.Context) string {
    return ctx.URLParam("id")
}))

// Todo If you don't want to lose messages, please turn it on
chatServer.SyncBroadcaster = true

app.Listen("0.0.0.0:8090")

}

AlanTianx commented 2 years ago
package main

import (
    "context"
    "fmt"
    "github.com/kataras/neffos"
    "github.com/kataras/neffos/gorilla"
    "strconv"
    "testing"
    "time"
)

var (
    handler = neffos.WithTimeout{
        Namespaces: neffos.Namespaces{
            "test": neffos.Events{
                neffos.OnNamespaceConnected: func(c *neffos.NSConn, msg neffos.Message) error {
                    fmt.Println("成功链接:", c.Conn.ID())
                    c.Emit("ch", []byte(""))
                    return nil
                },
                neffos.OnNamespaceDisconnect: func(c *neffos.NSConn, msg neffos.Message) error {
                    fmt.Println("断开链接:", c.Conn.ID())
                    return nil
                },
                neffos.OnRoomJoined: func(c *neffos.NSConn, msg neffos.Message) error {
                    fmt.Println("OnRoomJoined:---", msg)
                    return nil
                },
                neffos.OnRoomLeft: func(c *neffos.NSConn, msg neffos.Message) error {
                    fmt.Println("OnRoomLeft:---", msg)
                    return nil
                },
                "ch": func(conn *neffos.NSConn, message neffos.Message) error {
                    fmt.Println("ch--------", string(message.Body))
                    return nil
                },
            },
        },
    }
)

// 1
func TestWs(t *testing.T) {
    // 直接使用大猩猩的dialer
    dialer := gorilla.DefaultDialer

    fmt.Println("-- Running...")

    go connect(dialer, "ws://127.0.0.1:8090/e?id=a")
    select {}
}

// 30
func TestA(t *testing.T) {
    dialer := gorilla.DefaultDialer

    fmt.Println("-- Running...")

    for i := 0; i < 30; i++ {
        go connect(dialer, "ws://127.0.0.1:8090/e?id="+strconv.Itoa(i))
    }
    select {}
}

func connect(dialer neffos.Dialer, url string) {
    ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
    defer cancel()

    fmt.Println(url)
    client, err := neffos.Dial(ctx, dialer, url, handler)

    if err != nil {
        fmt.Printf("connection failure: %v\n", err)
        return
    }

    if client.ID == "" {
        panic("CLIENT'S ID IS EMPTY.\n DIAL NOW SHOULD BLOCK UNTIL ID IS FILLED(ACK) AND UNTIL SERVER'S CONFIRMATION")
    }

    ctxConnect, cancelConnect := context.WithDeadline(context.Background(), time.Now().Add(25*time.Second))
    defer cancelConnect()

    var c *neffos.NSConn

    c, err = client.Connect(ctxConnect, "test")

    if err != nil {
        fmt.Println("链接到namespace err:", err)

        return
    }

    if c.Conn.ID() == "" {
        panic("CLIENT'S CONNECTION ID IS EMPTY.\nCONNECT SHOULD BLOCK UNTIL ID IS FILLED(ACK) AND UNTIL SERVER'S CONFIRMATION TO NAMESPACE CONNECTION")
    }
}
AlanTianx commented 2 years ago

After a more in-depth investigation, it was found that when broadcasting a message to A Conn, A Conn just disconnected, and the internal judgment method *Conn.canWrite() would generate lock waiting, which caused the broadcast to be blocked all the time.

image