njones / socketio

A Modern SocketIO library for go
MIT License
60 stars 9 forks source link

socket-io: Allocated memory isn't released under load test #75

Open tavancini-tc opened 1 year ago

tavancini-tc commented 1 year ago

Hi Nika! So, I'm running some load tests here and noticed the server is not releasing allocated memory.

Here is what I found:

  1. After running the load test, I got around 5Gib of allocated memory that is not released even after GC execution. Screen Shot 2023-04-20 at 16 23 58

  2. Digging into pprof, I got: Screen Shot 2023-04-20 at 16 25 01

Screen Shot 2023-04-20 at 16 26 21

To reproduce the test:

  1. Install k6;
  2. Start the server go run main.go (code below);
  3. Run k6 load test script k6 run loadTest.js (code below);
  4. Run pprof:
    $ go tool pprof http://localhost:8003/debug/pprof/heap

main.go

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "net/http/pprof"
    "os"
    "os/signal"
    "runtime"
    "strings"
    "time"

    sio "github.com/njones/socketio"
    "github.com/njones/socketio/callback"
    eio "github.com/njones/socketio/engineio"
    ser "github.com/njones/socketio/serialize"
)

const (
    httpServerPort = ":8003"
)

func main() {
    var ruTimerQuitChan chan struct{}

    sioServer := sio.NewServerV2(
        sio.WithPath("/websocket/api/socket.io/"),
        eio.WithCors(eio.CORSorigin{"*"}),
        eio.WithSessionShave(1*time.Millisecond),
        eio.WithPingInterval(5*time.Second),
        eio.WithPingTimeout(1*time.Minute),
        eio.WithMaxPayload(1000000),
    )

    // /
    sioServer.OnConnect(func(socket *sio.SocketV2) error {
        log.Printf("/ connected: %s", socket.ID().String())
        log.Printf("/ token: %s", socket.Request().URL.Query().Get("token"))
        return nil
    })

    // CHANNEL
    channel := sioServer.Of("/channel")
    channel.OnConnect(func(socket *sio.SocketV2) error {
        log.Printf("/channel connected: %s", socket.ID().String())
        log.Printf("/channel token: %s", socket.Request().URL.Query().Get("token"))
        return nil
    })

    // CHAT
    chat := sioServer.Of("/chat")
    chat.OnConnect(func(socket *sio.SocketV2) error {
        log.Printf("/chat connected: %s", socket.ID().String())
        log.Printf("/chat token: %s", socket.Request().URL.Query().Get("token"))
        socket.On("join", callback.Wrap{
            Parameters: []ser.Serializable{ser.StrParam},
            Func: func() interface{} {
                return func(room string) error {
                    log.Print("/chat join event")
                    return nil
                }
            },
        })

        socket.On("leave", callback.Wrap{
            Parameters: []ser.Serializable{ser.StrParam},
            Func: func() interface{} {
                return func(room string) error {
                    log.Print("/chat leave event")
                    return nil
                }
            },
        })

        return nil
    })

    // Debug
    ruTicker := time.NewTicker(time.Second * 10)
    ruTimerQuitChan = make(chan struct{})
    go func() {
        for {
            select {
            case <-ruTicker.C:
                printMemStats()
            case <-ruTimerQuitChan:
                ruTicker.Stop()
                return
            }
        }
    }()

    defaultMux := http.NewServeMux()

    // Socket.io setup
    defaultMux.Handle("/", sioServer)

    // Pprof setup
    defaultMux.HandleFunc("/debug/pprof/", pprof.Index)

    sioHTTPServer := &http.Server{
        Addr:              httpServerPort,
        Handler:           defaultMux,
        ReadHeaderTimeout: 2 * time.Second,
    }

    // Start server
    log.Printf("Server running at %s.", httpServerPort)
    go func() {
        if err := sioHTTPServer.ListenAndServe(); err != nil {
            log.Fatal(err)
        }
    }()

    stop := make(chan os.Signal, 1)
    signal.Notify(stop, os.Interrupt)
    <-stop
    close(ruTimerQuitChan)
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    if err := sioHTTPServer.Shutdown(ctx); err != nil {
        log.Fatal(err)
    }
}

func printMemStats() {
    var stats strings.Builder
    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    stats.WriteString(fmt.Sprintf("Alloc=%vMiB; TotalAlloc=%v MiB; Sys=%vMiB; GoRoutines=%d; NumGC=%v",
        bToMb(m.Alloc),
        bToMb(m.TotalAlloc),
        bToMb(m.Sys),
        runtime.NumGoroutine(),
        m.NumGC))
    log.Print(stats.String())
}

func bToMb(b uint64) uint64 {
    return b / 1024 / 1024
}

loadTest.js

import { check, group, sleep } from 'k6';
import ws from 'k6/ws';

export const options = {
  thresholds: {
    http_req_failed: ['rate<0.50'],
    http_req_duration: ['p(90)<5000'],
  },
  stages: [
    { duration: '2m', target: 100 },
    { duration: '4m', target: 100 },
    { duration: '2m', target: 0 },
  ],
};

const nameSpaces = ['', 'channel', 'chat'];
const token = '01234567890123456789012345';
const url = 'ws://localhost:8003/websocket/api/socket.io/?token=' + token + '&EIO=3&transport=websocket';

export default function () {
  nameSpaces.forEach((nameSpace) => {
    group("Test '" + nameSpace + "'", function () {
      let response = ws.connect(url, {}, function (socket) {
        socket.on('open', function open() {
          if (nameSpace.length > 0) {
            socket.send('40/' + nameSpace);
          }

          if (nameSpace === 'chat') {
            sleep(1);
            socket.send('42/chat,["join","room-id"]');
            sleep(1);
            socket.send('42/chat,["leave","room-id"]');
          }

          sleep(1);
          socket.close();
        });
      });

      check(response, {
        'status is 101': (r) => r && r.status === 101,
      });
    });
  });
}
njones commented 1 year ago

Interesting... I'll check this out, and fix it.

Thanks again for the awesome detailed report!

tavancini-tc commented 1 year ago

Hi Nika, how are you? Do you have any updates? Tks