IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.43k stars 1.75k forks source link

zstd compression wastes too much time on memory allocation due to lack of destination buffer #2964

Open HenryCaiHaiying opened 1 month ago

HenryCaiHaiying commented 1 month ago
Description

zstd compression wastes too much time on malloc due to lack of destination buffer

Versions

Sarama v1.41.0

Sarama Kafka Go
1.41.0 3.8.0 1.21.4
Configuration
Producer.Compression = CompressionZSTD
Logs
logs: malloc

``` goroutine 12165 [GC assist wait]: runtime.gopark(0xc04dfab240?, 0x12d9c8b?, 0x0?, 0x0?, 0x74b8a57f60a8?) /usr/local/go/src/runtime/proc.go:398 +0xce fp=0xc04dfab1c8 sp=0xc04dfab1a8 pc=0x13028ae runtime.goparkunlock(...) /usr/local/go/src/runtime/proc.go:404 runtime.gcParkAssist() /usr/local/go/src/runtime/mgcmark.go:652 +0xe7 fp=0xc04dfab200 sp=0xc04dfab1c8 pc=0x12e47e7 runtime.gcAssistAlloc(0xc00f48a820) /usr/local/go/src/runtime/mgcmark.go:509 +0x22a fp=0xc04dfab260 sp=0xc04dfab200 pc=0x12e424a runtime.deductAssistCredit(0xc057881ea0?) /usr/local/go/src/runtime/malloc.go:1291 +0x54 fp=0xc04dfab288 sp=0xc04dfab260 pc=0x12d27f4 runtime.mallocgc(0x37e, 0x3544380, 0x1) /usr/local/go/src/runtime/malloc.go:1006 +0xc9 fp=0xc04dfab2f0 sp=0xc04dfab288 pc=0x12d2029 runtime.makeslice(0x389d880?, 0xc014798b60?, 0x74b80c61ae10?) /usr/local/go/src/runtime/slice.go:103 +0x49 fp=0xc04dfab318 sp=0xc04dfab2f0 pc=0x13180e9 github.com/klauspost/compress/zstd.(*Encoder).EncodeAll(0xc0594a28c0, {0xc02221e380, 0x37e, 0x37e}, {0x0, 0x0, 0x0}) /go/pkg/mod/github.com/klauspost/compress@v1.17.4/zstd/encoder.go:516 +0x313 fp=0xc04dfab498 sp=0xc04dfab318 pc=0x1fd3b93 github.com/IBM/sarama.zstdCompress({0x0?}, {0x0, 0x0, 0x0}, {0xc02221e380, 0x37e, 0x37e}) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/zstd.go:71 +0x86 fp=0xc04dfab510 sp=0xc04dfab498 pc=0x20495c6 github.com/IBM/sarama.compress(0x20?, 0xc047086b88?, {0xc02221e380, 0x37e, 0x37e}) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/compress.go:190 +0xdae fp=0xc04dfaba38 sp=0xc04dfab510 pc=0x200bb4e github.com/IBM/sarama.(*RecordBatch).encodeRecords(0xc009c36140, {0x3ea55e8?, 0xc008c79480?}) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/record_batch.go:206 +0x9a fp=0xc04dfaba80 sp=0xc04dfaba38 pc=0x203d7da github.com/IBM/sarama.(*RecordBatch).encode(0xc009c36140, {0x3ea55e8, 0xc008c79480}) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/record_batch.go:88 +0x231 fp=0xc04dfabae0 sp=0xc04dfaba80 pc=0x203cd31 github.com/IBM/sarama.(*Records).encode(0xc008c79480?, {0x3ea55e8?, 0xc008c79480?}) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/records.go:62 +0x98 fp=0xc04dfabb28 sp=0xc04dfabae0 pc=0x203dab8 github.com/IBM/sarama.(*ProduceRequest).encode(0xc008c79440, {0x3ea55e8, 0xc008c79480}) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/produce_request.go:108 +0x5e7 fp=0xc04dfabcd8 sp=0xc04dfabb28 pc=0x2035367 github.com/IBM/sarama.(*request).encode(0xc0161f6450, {0x3ea55e8, 0xc008c79480}) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/request.go:43 +0x13c fp=0xc04dfabd00 sp=0xc04dfabcd8 pc=0x203e4dc github.com/IBM/sarama.encode({0x3e67160, 0xc0161f6450}, {0x3e91630?, 0xc007a2f3b0}) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/encoder_decoder.go:29 +0x91 fp=0xc04dfabd70 sp=0xc04dfabd00 pc=0x201af31 github.com/IBM/sarama.(*Broker).sendInternal(0xc008100800, {0x3e8dc30, 0xc008c79440}, 0xc02e47a880) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/broker.go:1001 +0x16b fp=0xc04dfabe50 sp=0xc04dfabd70 pc=0x1ffc94b github.com/IBM/sarama.(*Broker).sendWithPromise(0xc008100800, {0x3e8dc30, 0xc008c79440}, 0x0?) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/broker.go:991 +0x9e fp=0xc04dfabe88 sp=0xc04dfabe50 pc=0x1ffc75e github.com/IBM/sarama.(*Broker).AsyncProduce(0xc008100800, 0xc008c79440, 0xc008c79460?) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/broker.go:470 +0x1cc fp=0xc04dfabf10 sp=0xc04dfabe88 pc=0x1ffa88c github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func1() /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/async_producer.go:828 +0x145 fp=0xc04dfabfa8 sp=0xc04dfabf10 pc=0x1ff2cc5 github.com/IBM/sarama.withRecover(0xc0105c9770?) /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/utils.go:43 +0x33 fp=0xc04dfabfc8 sp=0xc04dfabfa8 pc=0x2048a13 github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func4() /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/async_producer.go:793 +0x25 fp=0xc04dfabfe0 sp=0xc04dfabfc8 pc=0x1ff2b45 runtime.goexit() /usr/local/go/src/runtime/asm_amd64.s:1650 +0x1 fp=0xc04dfabfe8 sp=0xc04dfabfe0 pc=0x1336601 created by github.com/IBM/sarama.(*asyncProducer).newBrokerProducer in goroutine 16568 /go/pkg/mod/github.com/!i!b!m/sarama@v1.41.0/async_producer.go:793 +0x2e5 ```

Additional Context

We are trying to switch our compression algorithm from snappy to zstd, we are seeing about 25% more compressed bytes which is good but we are also seeing more CPU usage which is bad. We ended up adding 60% more pods to handle the same traffic load.

We are using sarama v1.14.0 and klauspost v1.71.4.

We generated a few stack traces and found about 75 go routines are stuck waiting for malloc, I haven't find a zstd go routine doing any real work except waiting for the malloc.

I think the malloc is from this line:

https://github.com/klauspost/compress/blob/v1.17.4/zstd/encoder.go#L516 // If less than 1MB, allocate a buffer up front. if len(dst) == 0 && cap(dst) == 0 && len(src) < 1<<20 && !e.o.lowMem { dst = make([]byte, 0, len(src)) }

Related issue is also fired on zstd compression library: https://github.com/klauspost/compress/issues/987

After discussion with clauspost/compress community, the recommendation is to pass a pre-allocated dst buffer instead of nil (2nd argument in the below code segment):

https://github.com/IBM/sarama/blob/main/compress.go#L190 case CompressionZSTD: return zstdCompress(ZstdEncoderParams{level}, nil, data)

HenryCaiHaiying commented 1 month ago

If sarama community thinks this is the right way to fix this problem to make zstd compression performant, I can submit a PR to add the following config option for zstd compression path:

Producer.Compression.ZStandard.DestinationBufferSize = 4096

In zstd.go, each ZstdEncoder can have pre-allocated slice dstBuffer of DesintationBufferSize if this param is set. When function zstdCompress() is invoked and when dst argument is nil and len(src) is less than DestinationBufferSize, we will use this pre-allocated dstBuffer slice to call the underlying zstd library.