cloudwego / netpoll

A high-performance non-blocking I/O networking framework focusing on RPC scenarios.
Apache License 2.0
4.07k stars 470 forks source link

采用多路复用模式,出现了数据乱码的情况 #282

Closed zzqCh closed 1 year ago

zzqCh commented 1 year ago

Describe the bug 采用多路复用模式,出现了数据乱码的情况:

To Reproduce 请求链路: client---->mosn----->netpoll,其中mosn方显示发出的数据是对的,但是接收到的是乱码,mosn也开启了多路复用。 netpoll的示例代码:


func (s *muxServer) handler(ctx context.Context, conn netpoll.Connection) (err error) {
    mc := ctx.Value(ctxkey).(*muxConn)
    reader := conn.Reader()
    var p1 []byte
    var p netpoll.Reader
    p1, err = reader.Peek(6)
    if err != nil {
        return err
    }
    length := int(binary.BigEndian.Uint32(p1[2:6])) + 6
    p, err = reader.Slice(length)
    if err != nil {
        return err
    }
    // handler must use another goroutine
    go func() {
        defer func() {
            if err2 := recover(); err2 != nil {
                err = fmt.Errorf("Recover from panic!,data is %v\n", err)
            }
        }()
        req := &runner.Message{}
        err = codec.Decode(p, req)
        if err != nil {
            panic(fmt.Errorf("netpoll decode failed: %s", err.Error()))
        }
        // handler
        resp := runner.ProcessRequest(reporter, req)
        // encode
        writer := netpoll.NewLinkBuffer()
        err = codec.Encode(writer, resp)
        if err != nil {
            panic(fmt.Errorf("netpoll encode failed: %s", err.Error()))
        }
        mc.Put(func() (buf netpoll.Writer, isNil bool) {
            return writer, false
        })
    }()
    return nil
}

func ProcessRequest(report *perf.Recorder, req *Message) (resp *Message) {
    decoder := req.DecoderData
    if decoder.DataLen == 0 || decoder.SerialNo == codec.HB {
        return &Message{DecoderData: &codec.CustomDecoder{Version: decoder.Version, SerialNo: decoder.SerialNo}}
    }
    //marshal, err := json.Marshal(req)
    logInfo("reqInfo serialNo:%s,data is:%#v", blueString("["+req.DecoderData.SerialNo+"]"), *decoder)
    // copy a map
    fruitRankCopy := make(map[string]string)
    for k, v := range decoder.Params {
        fruitRankCopy[k] = v
    }
    fruitRankCopy["Error1123"] = "999"
    resp = &Message{
        DecoderData: &codec.CustomDecoder{
            Version: decoder.Version, Params: fruitRankCopy},
    }
    //marshal1, err1 := json.Marshal(resp)
    logInfo("respInfo serialNo:%s,data is:%+v", blueString("["+resp.DecoderData.SerialNo+"]"), *resp.DecoderData)
    if len(decoder.Params) > 0 {
        sleepMs, ok := decoder.Params["sleep"]
        if ok {
            sl, err := strconv.ParseInt(sleepMs, 10, 0)
            if err == nil && sl > 0 {
                time.Sleep(time.Millisecond * time.Duration(sl))
            }
        }
    }
    //marshal1, err1 := json.Marshal(resp)
    logInfo("respInfo---->222 serialNo:%s,data is:%+v", blueString("["+resp.DecoderData.SerialNo+"]"), *resp.DecoderData)
    return resp
}

Expected behavior 两个loginfo打印的内容一致。

Screenshots

image

测试过程中发现,如果sleep 3s或者更长,那么两个loginfo打印的内容就不一样了,而且会参杂其他client的请求内容,但是如果不sleep,那么就不会出现问题。

joway commented 1 year ago

@zzqCh 你完整的测试代码能够方便建立一个repo出来吗?比如你这里的 muxConn 是啥

zzqCh commented 1 year ago

@zzqCh 你完整的测试代码能够方便建立一个repo出来吗?比如你这里的 muxConn 是啥

代码是copy的benchmark的库,可以参考这块https://github.com/cloudwego/netpoll-benchmark/blob/main/netpoll/mux_server.go#L112

joway commented 1 year ago

@zzqCh 所以你除了 process 函数,其他地方都没改过吗?尤其是 encode decode 函数

joway commented 1 year ago

你这个问题极大的可能性来自:decode 函数中,拿到 netpoll 的一个内存对象,比如 conn.Next(4) 后,然后直接调用了 release 函数。这个时候,netpoll 会认为你已经不使用 next 函数返回的内存了。就会重用这段内存。

正确的时序应该是:

  1. conn.Next/... 读取数据
  2. process 业务逻辑处理
  3. 确定业务逻辑运行完毕,不持有上面的对象后,调用 release 标记释放这段内存
joway commented 1 year ago

如果你不能确保这个时序,还有一个解法是不用使用nocopy API,如 Next。

https://github.com/cloudwego/netpoll/blob/e69450be63da0d3cc6e14f3f62b3e16757676e08/nocopy.go#L35 这里注释有详细描述

zzqCh commented 1 year ago

如果你不能确保这个时序,还有一个解法是不用使用nocopy API,如 Next。

https://github.com/cloudwego/netpoll/blob/e69450be63da0d3cc6e14f3f62b3e16757676e08/nocopy.go#L35

这里注释有详细描述

我看了遍代码,确实如您所说,是因为slice转string的时候用的指针替换,然后提前release了。 那不使用nocopy API,我们应该使用哪个api的呢?有示例么?

joway commented 1 year ago

@zzqCh 你直接使用:https://github.com/cloudwego/netpoll/blob/e69450be63da0d3cc6e14f3f62b3e16757676e08/nocopy.go#L73

这个方法是可以帮你 copy 好的。

此外,另外一个方法是,你只在最终确定读取的数据不再被使用后,调用release,这样也能继续使用现有的方法

zzqCh commented 1 year ago

@zzqCh 你直接使用:

https://github.com/cloudwego/netpoll/blob/e69450be63da0d3cc6e14f3f62b3e16757676e08/nocopy.go#L73

这个方法是可以帮你 copy 好的。

此外,另外一个方法是,你只在最终确定读取的数据不再被使用后,调用release,这样也能继续使用现有的方法

好的,谢谢您,我这边按照您说的改下代码。