panjf2000 / gnet

🚀 gnet is a high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go.
https://gnet.host
Apache License 2.0
9.7k stars 1.04k forks source link

消息丢失 #404

Closed zhouchang2017 closed 2 years ago

zhouchang2017 commented 2 years ago

用gnet实现websocket服务端,在客户端循环往服务端发送消息的时候。服务端收到的消息与客户端发送不一致

测试场景;客户端与服务端建立连接后;客户端循环往服务端发送10次消息;服务端基本上接受不到10次消息

服务端代码

package main

import (
    "flag"
    "fmt"
    "github.com/gobwas/ws"
    "github.com/gobwas/ws/wsutil"
    "github.com/panjf2000/gnet/v2"
    "log"
    "sync/atomic"
)

type wsServer struct {
    gnet.BuiltinEventEngine

    addr      string
    multicore bool
    eng       gnet.Engine
    connected int64
}

type wsCodec struct {
    ws bool
}

func (wss *wsServer) OnBoot(eng gnet.Engine) gnet.Action {
    wss.eng = eng
    log.Printf("ws server with multi-core=%t is listening on %s\n", wss.multicore, wss.addr)
    return gnet.None
}

func (wss *wsServer) OnOpen(c gnet.Conn) ([]byte, gnet.Action) {
    c.SetContext(new(wsCodec))
    atomic.AddInt64(&wss.connected, 1)
    return nil, gnet.None
}

func (wss *wsServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
    if err != nil && c != nil {
        log.Printf("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err)
    }
    atomic.AddInt64(&wss.connected, -1)
    log.Printf("conn[%v] disconnected", c.RemoteAddr().String())
    return gnet.None
}

func (wss *wsServer) OnTraffic(c gnet.Conn) gnet.Action {
    if !c.Context().(*wsCodec).ws {
        _, err := ws.Upgrade(c)
        log.Printf("conn[%v] upgrade websocket protocol\n", c.RemoteAddr().String())
        if err != nil {
            log.Printf("conn[%v] [err=%v]\n", c.RemoteAddr().String(), err.Error())
            return gnet.Close
        }
        c.Context().(*wsCodec).ws = true
    } else {

        msg, op, err := wsutil.ReadClientData(c)
        log.Printf("[C->S] RECV msg: %s\n", string(msg))
        if err != nil {
            if _, ok := err.(wsutil.ClosedError); !ok {
                log.Printf("conn[%v] [err=%v]\n", c.RemoteAddr().String(), err.Error())
            }
            return gnet.Close
        }
        //log.Printf("conn[%v] receive [op=%v] [msg=%v]\n", c.RemoteAddr().String(), op, string(msg))
        // This is the echo server
        err = wsutil.WriteServerMessage(c, op, msg)
        if err != nil {
            log.Printf("conn[%v] [err=%v]\n", c.RemoteAddr().String(), err.Error())
            return gnet.Close
        }
        log.Printf("[S->C] SEND msg: %s\n", string(msg))
    }

    return gnet.None
}

func main() {
    var port int
    var multicore bool

    // Example command: go run main.go --port 8080 --multicore=true
    flag.IntVar(&port, "port", 9080, "server port")
    flag.BoolVar(&multicore, "multicore", true, "multicore")
    flag.Parse()

    wss := &wsServer{addr: fmt.Sprintf("tcp://127.0.0.1:%d", port), multicore: multicore}

    // Start serving!
    log.Println("server exits:", gnet.Run(wss, wss.addr,
        gnet.WithMulticore(multicore),
        gnet.WithReusePort(true)))
}

客户端代码

package main

import (
    "encoding/json"
    "flag"
    "fmt"
    "github.com/gorilla/websocket"
    "log"
    "net/url"
    "os"
    "os/signal"
    "sync/atomic"
)

var addr = flag.String("addr", "localhost:9080", "http service address")

func main() {
    flag.Parse()

    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)

    u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}
    log.Printf("connecting to %s\n", u.String())

    c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
    if err != nil {
        panic(fmt.Sprintf("dial: %s", err))
    }
    defer c.Close()

    go func() {
        for {
            _, message, err := c.ReadMessage()
            if err != nil {
                panic(err)
            }

            log.Printf("[S->C] RECV msg: %s\n", string(message))
        }
    }()
    count := 10
    messages := make([][]byte, 0, count)
    for i := 0; i < count; i++ {
        messages = append(messages, makeMessage())
    }

    for _, msg := range messages {
        err := c.WriteMessage(websocket.TextMessage, msg)
        if err != nil {
            log.Printf("write err: %s\n", err)
            return
        }
        log.Printf("[C->S] SEND msg: %s\n", string(msg))
    }
    quit := make(chan os.Signal)
    signal.Notify(quit, os.Interrupt)
    <-quit
    log.Println("Shutdown Client ...")
    err = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
    if err != nil {
        log.Printf("write close err: %s\n", err)
        return
    }
    log.Println("Client exiting")
}

var n int64

func makeMessage() []byte {
    addInt64 := atomic.AddInt64(&n, 1)
    m := map[string]interface{}{
        "reqId": addInt64,
    }
    marshal, _ := json.Marshal(m)
    return marshal
}

运行结果: 服务端控制台输出:

➜  wsserver go run main.go
2022/09/22 22:29:29 ws server with multi-core=true is listening on tcp://127.0.0.1:9080
2022/09/22 22:29:33 conn[127.0.0.1:55935] upgrade websocket protocol
2022/09/22 22:29:33 [C->S] RECV msg: {"reqId":1}
2022/09/22 22:29:33 [S->C] SEND msg: {"reqId":1}
2022/09/22 22:29:33 [C->S] RECV msg: {"reqId":2}
2022/09/22 22:29:33 [S->C] SEND msg: {"reqId":2}
2022/09/22 22:29:33 [C->S] RECV msg: {"reqId":3}
2022/09/22 22:29:33 [S->C] SEND msg: {"reqId":3}
2022/09/22 22:29:33 [C->S] RECV msg: {"reqId":4}
2022/09/22 22:29:33 [S->C] SEND msg: {"reqId":4}
2022/09/22 22:29:38 error occurred on connection=127.0.0.1:55935, connection reset by peer
2022/09/22 22:29:38 conn[127.0.0.1:55935] disconnected

客户端控制台输出:

➜  wsclient go run main.go
2022/09/22 22:29:33 connecting to ws://localhost:9080/
2022/09/22 22:29:33 [C->S] SEND msg: {"reqId":1}
2022/09/22 22:29:33 [C->S] SEND msg: {"reqId":2}
2022/09/22 22:29:33 [C->S] SEND msg: {"reqId":3}
2022/09/22 22:29:33 [C->S] SEND msg: {"reqId":4}
2022/09/22 22:29:33 [C->S] SEND msg: {"reqId":5}
2022/09/22 22:29:33 [C->S] SEND msg: {"reqId":6}
2022/09/22 22:29:33 [C->S] SEND msg: {"reqId":7}
2022/09/22 22:29:33 [C->S] SEND msg: {"reqId":8}
2022/09/22 22:29:33 [C->S] SEND msg: {"reqId":9}
2022/09/22 22:29:33 [C->S] SEND msg: {"reqId":10}
2022/09/22 22:29:33 [S->C] RECV msg: {"reqId":1}
2022/09/22 22:29:33 [S->C] RECV msg: {"reqId":2}
2022/09/22 22:29:33 [S->C] RECV msg: {"reqId":3}
2022/09/22 22:29:33 [S->C] RECV msg: {"reqId":4}
^C2022/09/22 22:29:38 Shutdown Client ...
2022/09/22 22:29:38 Client exiting
panic: read tcp 127.0.0.1:55935->127.0.0.1:9080: use of closed network connection

System Info (please fill out the following information):

panjf2000 commented 2 years ago

OnTraffic 里要用 for 循环一直处理直到没有数据或者遇到残包,否则的话你要先一次性把所有数据读到你自己的 buffer 里去,后面自己去慢慢处理。

soukengo commented 2 years ago

OnTraffic 里要用 for 循环一直处理直到没有数据或者遇到残包,否则的话你要先一次性把所有数据读到你自己的 buffer 里去,后面自己去慢慢处理。

我遇到了相同的问题,改为在OnTraffic中for循环读取又会导致阻塞,和下面这个问题一样https://github.com/panjf2000/gnet/issues/384 然后我在读取之前判断了是否c.InboundBuffered() == 0, 为0则不读取,问题得到解决,不知道这是不是正确的打开方式

panjf2000 commented 2 years ago

OnTraffic 里要用 for 循环一直处理直到没有数据或者遇到残包,否则的话你要先一次性把所有数据读到你自己的 buffer 里去,后面自己去慢慢处理。

我遇到了相同的问题,改为在OnTraffic中for循环读取又会导致阻塞,和下面这个问题一样#384 然后我在读取之前判断了是否c.InboundBuffered() == 0, 为0则不读取,问题得到解决,不知道这是不是正确的打开方式

你是怎么读数据的?Read/Peek/Next 这几个方法都只是读本地 buffer,不可能会阻塞。 @soukengo

soukengo commented 2 years ago

OnTraffic 里要用 for 循环一直处理直到没有数据或者遇到残包,否则的话你要先一次性把所有数据读到你自己的 buffer 里去,后面自己去慢慢处理。

我遇到了相同的问题,改为在OnTraffic中for循环读取又会导致阻塞,和下面这个问题一样#384 然后我在读取之前判断了是否c.InboundBuffered() == 0, 为0则不读取,问题得到解决,不知道这是不是正确的打开方式

你是怎么读数据的?Read/Peek/Next 这几个方法都只是读本地 buffer,不可能会阻塞。 @soukengo

我是采用的demo示例里的读取方式:wsutil.ReadClientData(c),客户端for循环连续发10个包,服务端读取到1、2个就会阻塞

@panjf2000 潘少 是我使用的方式不对吗?

panjf2000 commented 2 years ago

websocket 那个 demo 可能有点问题,之前是别人贡献的,我也没仔细测过。

soukengo commented 2 years ago

我实际测试websocket server在调用wsutil.ReadClientData(c) 之前,判断一下c.InboundBuffered() == 0,就不会导致阻塞。 而tcp server 直接Read/Peek不会阻塞

zhouchang2017 commented 2 years ago

目前问题解决了;下面是server端代码:

package server

import (
    "bytes"
    "context"
    "fmt"
    "github.com/gobwas/ws"
    "github.com/gobwas/ws/wsutil"
    "github.com/google/uuid"
    "github.com/panjf2000/gnet/v2"
    "github.com/panjf2000/gnet/v2/pkg/logging"
    "io"
    "os"
    "strings"
    "ws-echo-srv/pkg/log"
)

type gNetServer struct {
    port int
    addr string
    eng  gnet.Engine
    gnet.BuiltinEventEngine
}

func NewGNetServer(port int) *gNetServer {
    return &gNetServer{
        port: port,
        addr: fmt.Sprintf("tcp://:%d", port),
    }
}

func (g *gNetServer) OnBoot(eng gnet.Engine) (action gnet.Action) {
    g.eng = eng
    return gnet.None
}

func (g *gNetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
    c.SetContext(&codec{
        _codec: &_codec{
            id:  uuid.New().String(),
            buf: nil,
        }})
    return nil, gnet.None
}

func (g *gNetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
    if err != nil && strings.Contains(err.Error(), "connection reset by peer") {
        // 对端关闭了连接
        return
    }
    log.Logger.Debugf("[%s] on close, err: %v", c.RemoteAddr(), err)
    return
}

type rw struct {
    io.Writer
    io.Reader
}

func (g *gNetServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
    ctx := context.Background()
    ctx, _ = log.SetRequestId(ctx, log.FromCtx(ctx), uuid.New().String())
    switch c.Context().(type) {
    case *codec:
        cod := c.Context().(*codec)
        if cod.upgraded {
            return cod.handle(ctx, c)
        }
        return cod.upgrade(ctx, c)
    default:
        log.Logger.Errorf("unknown type codec, close")
        return gnet.Close
    }
}

func (g *gNetServer) Serve() error {
    //log.Logger.SetLevel("error")
    return gnet.Run(g, g.addr,
        gnet.WithMulticore(true),
        gnet.WithReusePort(true),
        gnet.WithLogger(log.Logger),
        gnet.WithLogLevel(logging.DebugLevel),
    )
}

func (g *gNetServer) Shutdown(ctx context.Context) error {
    return gnet.Stop(ctx, g.addr)
}

type _codec struct {
    id  string
    buf []byte
}

func (c *_codec) read(ctx context.Context, conn gnet.Conn, cb func(ctx context.Context, r *bytes.Buffer) error) (action gnet.Action) {
    logger := log.FromCtx(ctx).WithField("ip", conn.RemoteAddr())
    ctx = log.WithCtx(ctx, logger)
    defer func() {
        if action == gnet.Close {
            logger.Debugf("response gnet.Close")
            c.buf = c.buf[:0]
        }
    }()
    for conn.InboundBuffered() > 0 {
        buf, err := conn.Next(-1)
        if err != nil {
            return gnet.Close
        }
        _buf := make([]byte, 0, len(c.buf)+len(buf))
        _buf = append(_buf, c.buf...)
        _buf = append(_buf, buf...)
        c.buf = _buf

        // 创建一个reader
        reader := bytes.NewBuffer(_buf)

        logger.Debugf("[Next] next buf(%d) | w.buf(%d) | reader(%d)", len(buf), len(c.buf), reader.Len())
        if err := cb(ctx, reader); err != nil {
            logger.Errorf("callback func err: %s", err.Error())
            return gnet.Close
        }
    }
    return gnet.None
}

type codec struct {
    *_codec
    upgraded bool
}

func (c *codec) upgrade(ctx context.Context, conn gnet.Conn) (action gnet.Action) {
    return c.read(ctx, conn, func(_ctx context.Context, reader *bytes.Buffer) error {
        _rw := &rw{
            Writer: conn,
            Reader: reader,
        }
        buf := reader.Bytes()
        _, err := ws.Upgrade(_rw)
        if err != nil {
            if err != io.EOF {
                return err
            }
            // io.EOF 消息被截断造成的;等待下一次从socket中读取消息
            return nil
        }
        if err = wsutil.WriteServerText(conn, []byte(fmt.Sprintf("hello world %s", c.id))); err != nil {
            return err
        }
        fmt.Fprintf(os.Stdout, "Request:\n%s\n\n", string(buf))

        //log.FromCtx(_ctx).Debugf("%s", string(buf))

        c.buf = c.buf[:0]
        c.upgraded = true
        return nil
    })
}

func (c *codec) handle(ctx context.Context, conn gnet.Conn) (action gnet.Action) {
    return c.read(ctx, conn, func(_ctx context.Context, reader *bytes.Buffer) error {
        _rw := &rw{
            Writer: conn,
            Reader: reader,
        }

        for {
            data, code, err := wsutil.ReadClientData(_rw)
            if err != nil {
                if err == io.ErrUnexpectedEOF {
                    // 消息截断
                    return nil
                }
                if _, ok := err.(wsutil.ClosedError); ok {
                    return err
                }
                return err
            }
            log.FromCtx(ctx).Debugf("[C->S] [%s] [%d] RECV msg: %s", conn.RemoteAddr(), code, string(data))
            if err = wsutil.WriteServerMessage(conn, code, data); err != nil {
                log.FromCtx(_ctx).Errorf("[C->S] write message err: %s", err.Error())
                return err
            }
            // 处理完成;剩余没有处理的数据存起来
            c.buf = reader.Bytes()
        }
    })
}