redis / rueidis

A fast Golang Redis client that supports Client Side Caching, Auto Pipelining, Generics OM, RedisJSON, RedisBloom, RediSearch, etc.
Apache License 2.0
2.47k stars 158 forks source link

feat: add DisableAutoPipelining to serve requests from the conn pool #646

Closed rueian closed 1 month ago

rueian commented 1 month ago

Auto pipelining can cause high p99 latencies (ref) and we are still trying to find a solution. This PR allows users to always use the pooled connections instead of pipelining commands into the same connections, except for client-side caching and pubsub requests.

The benchmark on 2 GCP n2-highcpu-2 machines, one runs redis-server, and the other one runs the following program, shows slightly better latencies and throughput than go-redis across concurrency 2, 4, 8, 16, and 32.

image
package main

import (
    "context"
    "fmt"
    "math/rand"
    "net/http"
    _ "net/http/pprof"
    "os"
    "strconv"
    "time"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/redis/go-redis/v9"
    "github.com/redis/rueidis"
)

func prepData(n int) []string {
    data := make([]string, n)
    for i := range data {
        data[i] = strconv.Itoa(i)
    }
    rand.Shuffle(len(data), func(i, j int) { data[i], data[j] = data[j], data[i] })
    return data
}

const (
    keyCount = 1000000
)

func main() {
    useGoRedis := os.Args[1] == "goredis"
    concurrency, err := strconv.Atoi(os.Args[2])
    if err != nil {
        panic(err)
    }

    rand.Seed(time.Now().UnixNano())
    bucket := []float64{50, 100, 150, 200, 250, 300, 350, 400, 450, 500, 750, 1000, 1500, 2000, 3000, 4000}

    rl := promauto.NewHistogram(prometheus.HistogramOpts{
        Name: "micro_read_latency", Buckets: bucket,
        ConstLabels: map[string]string{"client": os.Args[1]},
    })

    go func() {
        http.Handle("/metrics", promhttp.Handler())
        http.ListenAndServe(":2112", nil)
    }()

    rc, err := rueidis.NewClient(rueidis.ClientOption{
        InitAddress:           []string{os.Args[3]},
        DisableAutoPipelining: true,
    })
    if err != nil {
        panic(err)
    }

    gc := redis.NewUniversalClient(&redis.UniversalOptions{
        Addrs: []string{os.Args[3]},
    })

    ctx := context.Background()

    goredisRead := func(key string) error {
        return gc.Get(ctx, key).Err()
    }
    rueidisRead := func(key string) error {
        return rc.Do(ctx, rc.B().Get().Key(key).Build()).Error()
    }

    var rfn func(key string) error

    if useGoRedis {
        rfn = goredisRead
    } else {
        rfn = rueidisRead
    }

    read1Fn := func(keys []string) {
        for _, k := range keys {
            ts := time.Now()
            err := rfn(k)
            rl.Observe(float64(time.Since(ts).Microseconds()))
            if err != nil {
                panic(err)
            }
        }
    }

    {
        keys := prepData(keyCount)
        data := prepData(keyCount)
        commands := make(rueidis.Commands, len(keys))
        for i := range commands {
            commands[i] = rc.B().Set().Key(keys[i]).Value(data[i]).Build()
        }
        ts := time.Now()
        for _, resp := range rc.DoMulti(ctx, commands...) {
            if err := resp.Error(); err != nil {
                panic(err)
            }
        }
        fmt.Println("ready", time.Since(ts))
    }

    if useGoRedis {
        rc.Close()
    } else {
        gc.Close()
    }

    for i := 0; i < concurrency; i++ {
        go func() {
            keys := prepData(keyCount)
            for {
                read1Fn(keys)
            }
        }()
    }
    time.Sleep(time.Minute * 2)
}