twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.68k stars 162 forks source link

lz4 decompression causes a lot of allocations due to ioutil.ReadAll #623

Open kalbhor opened 8 months ago

kalbhor commented 8 months ago

While using a consumer group where messages were lz4 compressed I noticed most of the allocations were due to ioutil.ReadAll. Should we consider using io.Copy (fixed buffer length) instead of ioutil.ReadAll (dynamically resized array)? I can provide benchmark results if needed

twmb commented 8 months ago

What does code look like that avoids ReadAll? io.Copy requires a destination to read into, and I don't think we can know the size of what the blob will decompress into ahead of time.

kalbhor commented 8 months ago

Since we don't know the final buffer size, we can't pre-allocate. But it looks like bytes.Buffer has a more optimal grow buffer method for most cases. Since ReadAll depends on append's grow strategy, it is generally discouraged for reading large data in-memory. There are discussions around this : https://github.com/golang/go/issues/50774

I used such an io.Copy example for benchmarks:

func decompressCopy(d []byte) ([]byte, error) {
    dc := dcPool.Get().(*lz4.Reader)
    defer dcPool.Put(dc)
    dc.Reset(bytes.NewReader(d))

    out := new(bytes.Buffer)
    if _, err := io.Copy(out, dc); err != nil {
        return nil, err
    }

    return out.Bytes(), nil
}
twmb commented 7 months ago

Sure, makes sense 👍

kalbhor commented 7 months ago

I found that gzip decompress also uses ioutil.ReadAll and could in most cases benefit from a bytes.Buffer. If you like I can open a pull request with these changes

twmb commented 7 months ago

Sure, also good. +1 to PR

twmb commented 7 months ago

Let me know if you can get a PR open in the next day or two -- I have two other PRs ready that can go into a patch release and this can fit as well.

kalbhor commented 7 months ago

Replaced ReadAll with bytes buffer for gzip and lz4 decompression

kalbhor commented 2 weeks ago

Decompression is still a large contributor to overall memory usage. We were able to notice memory jumping to 4GB processing 100k messages per sec, where each message was <5KB.

After revisiting the code, I noticed we could maybe re-use the buffer from a sync pool instead of creating a new one each time. For eg:

func decompressCopy(d []byte) ([]byte, error) {
        dc := dcPool.Get().(*lz4.Reader)
    defer dcPool.Put(dc)
    dc.Reset(bytes.NewReader(d))

        // assuming `bufferPool` is a bytes sync.Pool 
        out := bufferPool.Get().(*bytes.Buffer)
        defer func() {
            out.Reset()
            bufferPool.Put(out)
        }()

        // since the byte buffer has been re-used, it should not cause many grow allocations
        if _, err := io.Copy(out, unlz4); err != nil {
            return nil, err
        }

        // create a new copy finally without growing the array
        result := out.Bytes()
        return append([]byte(nil), result...), nil
}

This would mean the bytes.Buffer will only re-adjust its length if a message/batch is larger than existing length. Also, the new larger buffer will be available for re-use next. This should drastically reduce the number of allocations. Let me know your thoughts and if you agree, I can send a PR for this.

kalbhor commented 2 weeks ago

@twmb We wrote few benchmarks (for lz4, but should be similar for other compressors) here : https://github.com/joeirimpan/franz-go-lz4-compression-bench and noticed drastic reduction in allocations.

image

Using:

  1. byte buff sync pool
  2. single byte buffer

Need to confirm if decompress is called concurrently and use the suitable approach.

dwagin commented 2 weeks ago

Please try my lib https://gitlab.com/dwagin/go-bytebuffer09:23, 9 июля 2024 г., Lakshay Kalbhor @.***>: We wrote few benchmarks (for lz4, but should be similar for other compressors) here : https://github.com/joeirimpan/franz-go-lz4-compression-bench and noticed drastic reduction in allocations. image.png (view on web) Using:

byte buff sync pool single byte buffer

Need to confirm if decompress is called concurrently and use the suitable approach.

—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you are subscribed to this thread.Message ID: @.***> -- /Dmitry

twmb commented 2 weeks ago

I'm open to a PR. You'll need to use the third option because decompressors are used concurrently -- for example, the client can be processing responses from multiple brokers at once.

twmb commented 2 weeks ago

@kalbhor actually, there's a different pool already internally that is used for producing that can be reused for consuming, sliceWriters.

BenchmarkDecompress/without-buf-8                 51      22620020 ns/op    268436230 B/op        23 allocs/op
BenchmarkDecompress/with-buf-8                    88      13419860 ns/op       95388 B/op          1 allocs/op
BenchmarkDecompress/with-buf-extra-copy-8         73      16215515 ns/op    74416809 B/op          2 allocs/op
BenchmarkDecompress/with-pool-8                   66      16394682 ns/op    78496323 B/op          3 allocs/op
BenchmarkDecompress/with-slice-writers-8          63      18033684 ns/op    74334741 B/op          3 allocs/op

This is the patch to franz-go:

diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go
index c5f2723..99903e7 100644
--- a/pkg/kgo/compression.go
+++ b/pkg/kgo/compression.go
@@ -268,11 +268,13 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
                if err := ungz.Reset(bytes.NewReader(src)); err != nil {
                        return nil, err
                }
-               out := new(bytes.Buffer)
-               if _, err := io.Copy(out, ungz); err != nil {
+               dst := sliceWriters.Get().(*sliceWriter)
+               dst.inner = dst.inner[:0]
+               defer sliceWriters.Put(dst)
+               if _, err := io.Copy(dst, ungz); err != nil {
                        return nil, err
                }
-               return out.Bytes(), nil
+               return append([]byte(nil), dst.inner...), nil
        case codecSnappy:
                if len(src) > 16 && bytes.HasPrefix(src, xerialPfx) {
                        return xerialDecode(src)
@@ -282,11 +284,13 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
                unlz4 := d.unlz4Pool.Get().(*lz4.Reader)
                defer d.unlz4Pool.Put(unlz4)
                unlz4.Reset(bytes.NewReader(src))
-               out := new(bytes.Buffer)
-               if _, err := io.Copy(out, unlz4); err != nil {
+               dst := sliceWriters.Get().(*sliceWriter)
+               dst.inner = dst.inner[:0]
+               defer sliceWriters.Put(dst)
+               if _, err := io.Copy(dst, unlz4); err != nil {
                        return nil, err
                }
-               return out.Bytes(), nil
+               return append([]byte(nil), dst.inner...), nil
        case codecZstd:
                unzstd := d.unzstdPool.Get().(*zstdDecoder)
                defer d.unzstdPool.Put(unzstd)

(similar patch to your repo)

twmb commented 2 weeks ago

FWIW It looks like with-pool still has fewer allocs; there may be some optimization in bytes.Buffer that I can't get at. If so, it may be worth removing sliceWriters entirely in favor of bytes.Buffer.

joeirimpan commented 2 weeks ago

Hi @twmb , You are right. I have updated the repo with bytes.Buffer, sliceWriter, github.com/valyala/bytebufferpool implementations. Seems like bytes.Buffer, bytebufferpool are performing better than sliceWriter.

kalbhor commented 2 weeks ago

Added a PR replacing sliceWriter with bytes.Buffer and use it in decompression

kalbhor commented 2 weeks ago

@twmb I noticed the integration tests failed initially : https://github.com/twmb/franz-go/actions/runs/9869580292 This was due to a bug (fixed now) in zstd and snappy decompression. It wasn't caught in the usual TestCompressDecompress. I think the compression/decompression test cases should check for buffer incorrect re-use as well. Need ideas on how to implement though, would like your opinions

image