xtaci / smux

A Stream Multiplexing Library for golang with least memory usage(TDMA)
MIT License
1.27k stars 189 forks source link

OpenStream似乎有并行性能问题 #97

Open WSitong opened 5 months ago

WSitong commented 5 months ago

我在测试我代码发现OpenStream并行性能存在问题,甚至没有串行好。不知道是不是运行平台或设备的原因,我的电脑运行下面代码可以复现这个问题(并行用时12.8s,串行用时1.2s)。下面代码里会运行一个回显服务器,Test_Serial和Test_Parallel作为客户端,会先创建一个smux.session,并开1000个协程创建stream,然后发送和接受数据。唯一的区别就是Test_Serial在OpenStream时使用了锁:

package openstream_test

import (
    "github.com/xtaci/kcp-go/v5"
    "github.com/xtaci/smux"
    "log"
    "net"
    "sync"
    "testing"
    "time"
)

var listener net.Listener

func Test_Serial(t *testing.T) {
    // 测试串行性能
    if listener == nil {
        go runServer()
        time.Sleep(time.Second)
    }
    conn, err := kcp.DialWithOptions("127.0.0.1:8000", nil, 0, 0)
    if err != nil {
        t.Error(err)
    }
    sess, err := smux.Client(conn, smux.DefaultConfig())
    if err != nil {
        t.Error(err)
    }
    // 使用同一个smux.session,串行创建1000个stream发送数据
    data := []byte("this is s test message")
    wg := sync.WaitGroup{}
    mux := sync.Mutex{}
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mux.Lock()
            stream, err := sess.OpenStream()
            mux.Unlock()
            if err != nil {
                return
            }
            stream.Write(data)
            buffer := make([]byte, 1024)
            stream.Read(buffer)
        }()
    }
    wg.Wait()
}

func Test_Parallel(t *testing.T) {
    // 测试并行性能
    if listener == nil {
        go runServer()
        time.Sleep(time.Second)
    }
    conn, err := kcp.DialWithOptions("127.0.0.1:8000", nil, 0, 0)
    if err != nil {
        t.Error(err)
    }
    sess, err := smux.Client(conn, smux.DefaultConfig())
    if err != nil {
        t.Error(err)
    }
    // 使用同一个smux.session,并行创建1000个stream发送数据
    data := []byte("this is s test message")
    wg := sync.WaitGroup{}
    //mux := sync.Mutex{}
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            //mux.Lock()
            stream, err := sess.OpenStream()
            //mux.Unlock()
            if err != nil {
                return
            }
            stream.Write(data)
            buffer := make([]byte, 1024)
            stream.Read(buffer)
        }()
    }
    wg.Wait()
}

func runServer() {
    // 回显服务器
    lis, err := kcp.ListenWithOptions(":8000", nil, 0, 0)
    if err != nil {
        log.Fatal(err)
    }
    listener = lis
    for {
        conn, err := lis.Accept()
        if err != nil {
            log.Fatal(err)
        }
        go func() {
            defer conn.Close()
            sess, err := smux.Server(conn, smux.DefaultConfig())
            if err != nil {
                log.Fatal(err)
            }
            for {
                stream, err := sess.AcceptStream()
                if err != nil {
                    log.Fatal(err)
                }
                go func() {
                    buffer := make([]byte, 1024)
                    n, err := stream.Read(buffer)
                    if err != nil {
                        log.Fatal(err)
                    }
                    stream.Write(buffer[:n])
                    time.Sleep(time.Minute)
                }()
            }
        }()
    }
}