mostynb / go-grpc-compression

go gRPC encoding wrappers for some useful compression algorithms that are not available in google.golang.org/grpc
Apache License 2.0
21 stars 10 forks source link

zstd performance compare #25

Open zdyj3170101136 opened 6 months ago

zdyj3170101136 commented 6 months ago

i notice you replace zstd.Write to zstd.EncodeAll but without performance compare in https://github.com/mostynb/go-grpc-compression/commit/5f2489304fdefc331e2803e6574af3b8c8f4f00a.

i implement different type of zstd encoding and benchmark it.

1, zstd-stream.

package main

import (
    "bytes"
    "io"
    "os"
    "runtime"
    "sync"

    "github.com/klauspost/compress/zstd"
    _ "github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
    "google.golang.org/grpc/encoding"
)

// Name is the name registered for the gzip compressor.
const Name = "zstd-stream"

func init() {
    c := &compressor{}
    c.poolDecompressor.New = func() interface{} {
        zr, _ := zstd.NewReader(nil)
        runtime.SetFinalizer(zr, (*zstd.Decoder).Close)
        return &reader{
            Reader: zr,
            pool:   &c.poolDecompressor,
        }
    }
    c.poolCompressor.New = func() interface{} {
        zw, _ := zstd.NewWriter(nil)
        runtime.SetFinalizer(zw, (*zstd.Encoder).Close)
        return &writer{
            Writer: zw,
            pool:   &c.poolCompressor,
        }
    }
    encoding.RegisterCompressor(c)
}

type writer struct {
    Writer *zstd.Encoder
    pool   *sync.Pool
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
    z := c.poolCompressor.Get().(*writer)
    z.Writer.Reset(w)
    return z, nil
}

func (z *writer) Close() error {
    defer z.pool.Put(z)
    return z.Writer.Close()
}

func (z *writer) Write(data []byte) (int, error) {
    return z.Writer.Write(data)
}

type reader struct {
    Reader *zstd.Decoder
    pool   *sync.Pool
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
    z := c.poolDecompressor.Get().(*reader)

    if err := z.Reader.Reset(r); err != nil {
        c.poolDecompressor.Put(z)
        return nil, err
    }
    return z, nil
}

func (z *reader) Read(p []byte) (n int, err error) {
    n, err = z.Reader.Read(p)
    if err == io.EOF {
        z.pool.Put(z)
    }
    return n, err
}

func (c *compressor) Name() string {
    return Name
}

type compressor struct {
    poolCompressor   sync.Pool
    poolDecompressor sync.Pool
}

2, zstd. use current package.

benchmark compress code


func main() {
    data, err := os.ReadFile("testdata/test.json")
    if err != nil {
        panic(err)
    }

    compress := func() []byte {
        var buf = bytes.Buffer{}
        c := encoding.GetCompressor("zstd-pool")
        w, _ := c.Compress(&buf)
        _, err = w.Write(data)
        err = w.Close()
        return buf.Bytes()
    }

    for i := 0; i < 10000; i++ {
        compress()
    }
}

benchmark compress

use linux time command to execute the program.

zstd-stream:

real    0m12.891s
user    0m14.182s
sys     0m0.111s

zstd:

real    0m17.284s
user    0m17.298s
sys     0m0.082s

benchmark decompress code


func main() {
    data, err := os.ReadFile("testdata/test.json")
    if err != nil {
        panic(err)
    }

    compress := func() []byte {
        var buf = bytes.Buffer{}
        c := encoding.GetCompressor("zstd")
        w, _ := c.Compress(&buf)
        _, err = w.Write(data)
        err = w.Close()
        return buf.Bytes()
    }

    compressed := compress()

    decompress := func() {
        c := encoding.GetCompressor("zstd-stream")
        r, err := c.Decompress(bytes.NewReader(compressed))
        if err != nil {
            panic(err)
        }
        _, err = io.ReadAll(r)
        if err != nil {
            panic(err)
        }
    }
    for i := 0; i < 100000; i++ {
        decompress()
    }
}

benchmark decompress

use linux time command to execute the program.

zstd-stream:

real    0m34.640s
user    0m40.313s
sys     0m1.434s

zstd:

real    0m35.556s
user    0m37.684s
sys     0m0.803s

test file

test file is from: https://github.com/klauspost/compress/blob/master/gzip/testdata/test.json

zdyj3170101136 commented 6 months ago

by the way, i tested https://github.com/dataDog/zstd:

benchmark code


func main() {
    data, err := os.ReadFile("testdata/test.json")
    if err != nil {
        panic(err)
    }

    compress := func() []byte {
        result, err := ddzstd.Compress(nil, data)
        if err != nil {
            panic(err)
        }
        return result
    }

    compressed := compress()

    decompress := func() {
        _, err = ddzstd.Decompress(nil, compressed)
        if err != nil {
            panic(err)
        }
    }
    for i := 0; i < 100000; i++ {
        decompress()
    }
}

benchmark

compress:

real    0m16.081s
user    0m16.398s
sys     0m0.077s

decompress:

real    0m15.462s
user    0m17.649s
sys     0m0.592s

benchmark streaming


func main() {
    data, err := os.ReadFile("testdata/test.json")
    if err != nil {
        panic(err)
    }

    compress := func() []byte {
        var b bytes.Buffer
        w := ddzstd.NewWriter(&b)
        w.Write(data)
        w.Close()
        return b.Bytes()
    }

    compressed := compress()

    decompress := func() {
        r := ddzstd.NewReader(bytes.NewReader(compressed))
        io.ReadAll(r)
        r.Close()
    }
    for i := 0; i < 10000; i++ {
        decompress()
    }
}

benchmark

compress:

real    0m15.020s
user    0m15.481s
sys     0m0.110s

decompress:

real    0m29.116s
user    0m42.343s
sys     0m8.431s
mostynb commented 6 months ago

Thanks for the interesting data.

I see that your benchmark is sequential- I wonder what the results would be like for concurrent compression/decompression?

zdyj3170101136 commented 4 months ago

Thanks for the interesting data.

I see that your benchmark is sequential- I wonder what the results would be like for concurrent compression/decompression?

what is your meaning?

the zstd use runtime.GOMAXPROCS(0) to compress and decompress by default.

zdyj3170101136 commented 4 months ago

concurrent compress:

    var wg sync.WaitGroup
    for j := 0; j < runtime.GOMAXPROCS(0); j++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for i := 0; i < 10000; i++ {
                compress()
            }
        }()
    }
    wg.Wait()

benchmark in 32 core machine: zstd stream compress

real    0m23.301s
user    11m44.090s
sys     0m2.399s

zstd encodeAll compress:

real    0m27.575s
user    12m36.868s
sys     0m5.687s
zdyj3170101136 commented 4 months ago

i recommend you replace zstd EncodeAll to zstd stream compress.

@mostynb