lesismal / nbio

Pure Go 1000k+ connections solution, support tls/http1.x/websocket and basically compatible with net/http, with high-performance and low memory cost, non-blocking, event-driven, easy-to-use.
MIT License
2.11k stars 151 forks source link

only 2,000 req/s with redis client example but using tls server example modification, i can get 25,000 req/s. what's wrong? #412

Closed kolinfluence closed 3 months ago

kolinfluence commented 3 months ago

@lesismal getting 2,000 req/s using this example, anyways to speed it up significantly? i've tested simple sending using tls example can get up to 25,000 req/s but this is only 2,000 req/s

pls check what's wrong and if it shld be used this way. thx

with reference to: https://github.com/lesismal/nbio-examples/issues/16

package main

import (
        "sync"
        "fmt"
        "github.com/lesismal/nbio"
        "github.com/lesismal/nbio/extension/tls"
        "log"
        "strings"
        "sync/atomic"
        "time"
)

type RedisClient struct {
        g          *nbio.Gopher
        addr       string
        tlsConfig  *tls.Config
        responseCh chan []byte
        errorCh    chan error
        pool       *ConnPool
}

var (
        qps int64 // Global counter for queries per second
        tlsConfig = &tls.Config{
                InsecureSkipVerify: true,
        }
)

// A simple structure for a connection pool.
type ConnPool struct {
        pool    chan *tls.Conn
        addr    string
        config  *tls.Config
        maxConn int
        mu      sync.Mutex
}

func NewRedisClient(addr string, tlsConfig *tls.Config, pool *ConnPool) *RedisClient {
        client := &RedisClient{
                g:          nbio.NewGopher(nbio.Config{}),
                addr:       addr,
                tlsConfig:  tlsConfig,
                responseCh: make(chan []byte, 1),
                errorCh:    make(chan error, 1),
                pool:       pool,
        }
        client.setupHandlers()
        return client
}

// NewConnPool creates a new connection pool.
func NewConnPool(addr string, config *tls.Config, maxConn int) *ConnPool {
        return &ConnPool{
                pool:    make(chan *tls.Conn, maxConn),
                addr:    addr,
                config:  config,
                maxConn: maxConn,
        }
}

// Get acquires a connection from the pool.
func (p *ConnPool) Get() (*tls.Conn, error) {
        select {
        case conn := <-p.pool:
                return conn, nil
        default:
                // Pool is empty, create a new connection
                return p.DialNew()
        }
}

func (p *ConnPool) DialNew() (*tls.Conn, error) {
        conn, err := tls.Dial("tcp", p.addr, p.config)
        if err != nil {
                return nil, err
        }
        return conn, nil
}

// Put returns a connection to the pool.
func (p *ConnPool) Put(conn *tls.Conn) {
        select {
        case p.pool <- conn:
                // Connection returned to the pool
log.Printf("pool returned")
        default:
log.Printf("clossssss")
                // Pool is full, close the connection
                conn.Close()
        }
}

// CloseAll drains the pool and closes all connections.
func (p *ConnPool) CloseAll() {
        for {
                select {
                case conn := <-p.pool:
                        conn.Close()
                default:
                        return
                }
        }
}

func (client *RedisClient) setupHandlers() {
        isClient := true

        client.g.OnOpen(tls.WrapOpen(client.tlsConfig, isClient, func(c *nbio.Conn, tlsConn *tls.Conn) {

                //log.Printf("connection open")
                // Initialize connection setup if necessary
        }))
        client.g.OnClose(tls.WrapClose(func(c *nbio.Conn, tlsConn *tls.Conn, err error) {
                //log.Printf("connection closed")
                client.errorCh <- err
                client.pool.Put(tlsConn)

        }))
        client.g.OnData(tls.WrapData(func(c *nbio.Conn, tlsConn *tls.Conn, data []byte) {
                //log.Printf("data = %s", data)
                // Handle incoming data, Redis protocol
                client.responseCh <- data
        }))
}

func (client *RedisClient) Start() error {
        return client.g.Start()
}

func (client *RedisClient) Stop() {
        client.g.Stop()
        close(client.responseCh)
        close(client.errorCh)
}

func (client *RedisClient) Do(cmd string, args ...string) (interface{}, error) {
        fullCmd := fmt.Sprintf("*%d\r\n$%d\r\n%s\r\n", len(args)+1, len(cmd), cmd)
        for _, arg := range args {
                fullCmd += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
        }

        atomic.AddInt64(&qps, 1)

        // Use the connection pool to get a TLS connection
        tlsConn, err := client.pool.Get()
        if err != nil {
                return nil, fmt.Errorf("failed to get connection: %v", err)
        }

        //defer client.pool.Put(tlsConn)

        // Convert *tls.Conn to *tls.Conn for use with NBIO
        nbConn, err := nbio.NBConn(tlsConn.Conn())
        if err != nil {
                return nil, fmt.Errorf("failed to convert connection: %v", err)
        }

        // step 3: set tls.Conn and nbio.Conn to each other, and add nbio.Conn to the gopher
        isNonblock := true
        nbConn.SetSession(tlsConn)
        tlsConn.ResetConn(nbConn, isNonblock)
        client.g.AddConn(nbConn)

        // step 4: write data here or in the OnOpen handler or anywhere
        _, err = tlsConn.Write([]byte(fullCmd))
        if err != nil {
                return nil, err
        }

        /*
        //c, err := client.g.Dial(client.addr)
        c, err := tls.Dial("tcp", client.addr, client.tlsConfig)
        if err != nil {
                return nil, err
        }
        defer c.Close()
        */

        select {
        case response := <-client.responseCh:
                return parseResponse(response)
        case err := <-client.errorCh:
                return nil, err
        }
}

func parseResponse(data []byte) (interface{}, error) {
        if len(data) == 0 {
                return nil, fmt.Errorf("no response")
        }

        switch data[0] {
        case '-':
                return string(data[1:]), fmt.Errorf("error response: %s", data[1:])
        case '+', ':':
                return string(data[1:]), nil
        case '$':
                parts := strings.SplitN(string(data[1:]), "\r\n", 2)
                if parts[0] == "-1" {
                        return nil, nil // Null bulk response
                }
                return parts[1], nil
        case '*':
                lines := strings.Split(string(data[1:]), "\r\n")
                var count int
                _, err := fmt.Sscanf(lines[0], "%d", &count)
                if err != nil || count <= 0 {
                        return nil, err
                }
                results := make([]interface{}, count)
                for i := 0; i < count; i++ {
                        results[i] = lines[i+1] // Simplification, might need refinement
                }
                return results, nil
        default:
                return nil, fmt.Errorf("unknown response type: %c", data[0])
        }
}

func main() {
        // TLS configuration for secure Redis connection, adjust as necessary.
        addr := "127.0.0.1:6380"
        pool := NewConnPool(addr, tlsConfig, 10) // Pool with max 10 connections
        client := NewRedisClient(addr, tlsConfig, pool)

        err := client.Start()
        if err != nil {
                log.Fatalf("Failed to start client: %v", err)
        }
        defer client.Stop()

        log.Printf("here1")

        // Use the client
        resp, err := client.Do("SET", "key", "value")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)

        startTime := time.Now() // Capture start time
        for i:=0; i< 10000;i++ {
                resp, err = client.Do("SET", "key", "value")
                if err != nil {
                        log.Fatalf("Failed to execute command: %v", err)
                }
        }

        elapsed := time.Since(startTime) // Calculate elapsed time
        fmt.Printf("Elapsed time: %s\n", elapsed)

        fmt.Printf("SET Response: %v\n", resp)
        resp, err = client.Do("SET", "key", "value")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)
        resp, err = client.Do("SET", "a", "b")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)

        log.Printf("here2")
        resp2, err := client.Do("GET", "key")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("GET Response: %v\n", resp2)

        log.Printf("here3")
        // Keep the main goroutine alive for a short time to ensure the response is processed
        time.Sleep(1 * time.Second)
}
kolinfluence commented 3 months ago

if i tried to change the defer Put but it crashed with:

./redismain 
2024/03/25 04:50:13.638 [INF] NBIO Engine[NB] start with [3 eventloop, MaxOpenFiles: 1048576]
2024/03/25 04:50:13 here1
2024/03/25 04:50:13 pool returned
SET Response: OK

2024/03/25 04:50:13.639 [ERR] [13] add read event failed: file exists
2024/03/25 04:50:13 pool returned
2024/03/25 04:50:13 Failed to execute command: use of closed network connection
package main

import (
        "sync"
        "fmt"
        "github.com/lesismal/nbio"
        "github.com/lesismal/nbio/extension/tls"
        "log"
        "strings"
        "sync/atomic"
        "time"
)

type RedisClient struct {
        g          *nbio.Gopher
        addr       string
        tlsConfig  *tls.Config
        responseCh chan []byte
        errorCh    chan error
        pool       *ConnPool
}

var (
        qps int64 // Global counter for queries per second
        tlsConfig = &tls.Config{
                InsecureSkipVerify: true,
        }
)

// A simple structure for a connection pool.
type ConnPool struct {
        pool    chan *tls.Conn
        addr    string
        config  *tls.Config
        maxConn int
        mu      sync.Mutex
}

func NewRedisClient(addr string, tlsConfig *tls.Config, pool *ConnPool) *RedisClient {
        client := &RedisClient{
                g:          nbio.NewGopher(nbio.Config{}),
                addr:       addr,
                tlsConfig:  tlsConfig,
                responseCh: make(chan []byte, 1),
                errorCh:    make(chan error, 1),
                pool:       pool,
        }
        client.setupHandlers()
        return client
}

// NewConnPool creates a new connection pool.
func NewConnPool(addr string, config *tls.Config, maxConn int) *ConnPool {
        return &ConnPool{
                pool:    make(chan *tls.Conn, maxConn),
                addr:    addr,
                config:  config,
                maxConn: maxConn,
        }
}

// Get acquires a connection from the pool.
func (p *ConnPool) Get() (*tls.Conn, error) {
        select {
        case conn := <-p.pool:
                return conn, nil
        default:
                // Pool is empty, create a new connection
                return p.DialNew()
        }
}

func (p *ConnPool) DialNew() (*tls.Conn, error) {
        conn, err := tls.Dial("tcp", p.addr, p.config)
        if err != nil {
                return nil, err
        }
        return conn, nil
}

// Put returns a connection to the pool.
func (p *ConnPool) Put(conn *tls.Conn) {
        select {
        case p.pool <- conn:
                // Connection returned to the pool
log.Printf("pool returned")
        default:
log.Printf("clossssss")
                // Pool is full, close the connection
                conn.Close()
        }
}

// CloseAll drains the pool and closes all connections.
func (p *ConnPool) CloseAll() {
        for {
                select {
                case conn := <-p.pool:
                        conn.Close()
                default:
                        return
                }
        }
}

func (client *RedisClient) setupHandlers() {
        isClient := true

        client.g.OnOpen(tls.WrapOpen(client.tlsConfig, isClient, func(c *nbio.Conn, tlsConn *tls.Conn) {

                //log.Printf("connection open")
                // Initialize connection setup if necessary
        }))
        client.g.OnClose(tls.WrapClose(func(c *nbio.Conn, tlsConn *tls.Conn, err error) {
                //log.Printf("connection closed")
                client.errorCh <- err
                //client.pool.Put(tlsConn)

        }))
        client.g.OnData(tls.WrapData(func(c *nbio.Conn, tlsConn *tls.Conn, data []byte) {
                //log.Printf("data = %s", data)
                // Handle incoming data, Redis protocol
                client.responseCh <- data
        }))
}

func (client *RedisClient) Start() error {
        return client.g.Start()
}

func (client *RedisClient) Stop() {
        client.g.Stop()
        close(client.responseCh)
        close(client.errorCh)
}

func (client *RedisClient) Do(cmd string, args ...string) (interface{}, error) {
        fullCmd := fmt.Sprintf("*%d\r\n$%d\r\n%s\r\n", len(args)+1, len(cmd), cmd)
        for _, arg := range args {
                fullCmd += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
        }

        atomic.AddInt64(&qps, 1)

        // Use the connection pool to get a TLS connection
        tlsConn, err := client.pool.Get()
        if err != nil {
                return nil, fmt.Errorf("failed to get connection: %v", err)
        }

        defer client.pool.Put(tlsConn)

        // Convert *tls.Conn to *tls.Conn for use with NBIO
        nbConn, err := nbio.NBConn(tlsConn.Conn())
        if err != nil {
                return nil, fmt.Errorf("failed to convert connection: %v", err)
        }

        // step 3: set tls.Conn and nbio.Conn to each other, and add nbio.Conn to the gopher
        isNonblock := true
        nbConn.SetSession(tlsConn)
        tlsConn.ResetConn(nbConn, isNonblock)
        client.g.AddConn(nbConn)

        // step 4: write data here or in the OnOpen handler or anywhere
        _, err = tlsConn.Write([]byte(fullCmd))
        if err != nil {
                return nil, err
        }

        /*
        //c, err := client.g.Dial(client.addr)
        c, err := tls.Dial("tcp", client.addr, client.tlsConfig)
        if err != nil {
                return nil, err
        }
        defer c.Close()
        */

        select {
        case response := <-client.responseCh:
                return parseResponse(response)
        case err := <-client.errorCh:
                return nil, err
        }
}

func parseResponse(data []byte) (interface{}, error) {
        if len(data) == 0 {
                return nil, fmt.Errorf("no response")
        }

        switch data[0] {
        case '-':
                return string(data[1:]), fmt.Errorf("error response: %s", data[1:])
        case '+', ':':
                return string(data[1:]), nil
        case '$':
                parts := strings.SplitN(string(data[1:]), "\r\n", 2)
                if parts[0] == "-1" {
                        return nil, nil // Null bulk response
                }
                return parts[1], nil
        case '*':
                lines := strings.Split(string(data[1:]), "\r\n")
                var count int
                _, err := fmt.Sscanf(lines[0], "%d", &count)
                if err != nil || count <= 0 {
                        return nil, err
                }
                results := make([]interface{}, count)
                for i := 0; i < count; i++ {
                        results[i] = lines[i+1] // Simplification, might need refinement
                }
                return results, nil
        default:
                return nil, fmt.Errorf("unknown response type: %c", data[0])
        }
}

func main() {
        // TLS configuration for secure Redis connection, adjust as necessary.
        addr := "127.0.0.1:6380"
        pool := NewConnPool(addr, tlsConfig, 10) // Pool with max 10 connections
        client := NewRedisClient(addr, tlsConfig, pool)

        err := client.Start()
        if err != nil {
                log.Fatalf("Failed to start client: %v", err)
        }
        defer client.Stop()

        log.Printf("here1")

        // Use the client
        resp, err := client.Do("SET", "key", "value")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)

        startTime := time.Now() // Capture start time
        for i:=0; i< 10000;i++ {
                resp, err = client.Do("SET", "key", "value")
                if err != nil {
                        log.Fatalf("Failed to execute command: %v", err)
                }
        }

        elapsed := time.Since(startTime) // Calculate elapsed time
        fmt.Printf("Elapsed time: %s\n", elapsed)

        fmt.Printf("SET Response: %v\n", resp)
        resp, err = client.Do("SET", "key", "value")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)
        resp, err = client.Do("SET", "a", "b")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)

        log.Printf("here2")
        resp2, err := client.Do("GET", "key")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("GET Response: %v\n", resp2)

        log.Printf("here3")
        // Keep the main goroutine alive for a short time to ensure the response is processed
        time.Sleep(1 * time.Second)
}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         
lesismal commented 3 months ago

I wouldn't provide so much user function customizing help, I'd close this and some other issues.