twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.61k stars 158 forks source link

kgo.FetchMaxPartitionBytes: increasing this option causes high memory usage #701

Closed sbuliarca closed 1 month ago

sbuliarca commented 3 months ago

We noticed that when increasing the value for kgo.FetchMaxPartitionBytes even to 10MB, causes our apps to use much more memory than normal. Also the throughput is affected. You can see the difference from these metrics:

From the metrics you can see that when running with no options the app gets to a mean of 231MB and when running with FetchMaxPartitionBytes set to 10MB it gets to 1.91GB

I created also a benchmark for this that proves the same thing and the code used is similar to what we have in our apps. Should be also useful in fixing: https://github.com/twmb/franz-go/compare/master...sbuliarca:benchmark-fetch-max-partition-bytes?body=&expand=1&title=Added+benchmark+for+using+kgo.FetchMaxPartitionBytes

twmb commented 3 months ago

By default, producing messages uses snappy compression. The consuming side configuration is based on batch size -- whatever is in the batches (compressed data) is not inspected by the broker. Is the memory on the consumer side because you're fetching, say, 10M that then gets decompressed to a large payload?

I do see your benchmark uses rand.Read, which likely is not compressible. However, the benchmark isn't demonstrating memory usage per op, it's demonstrating how fast your network connection is such that it can consume so much data so quickly (i.e., if this client was implemented very poorly such that polling weren't fast, then fewer polls could happen per iteration and it would show lower memory usage).

sbuliarca commented 2 months ago

Is the memory on the consumer side because you're fetching, say, 10M that then gets decompressed to a large payload?

Indeed, but isn't it the same also when not using the kgo. FetchMaxPartitionBytes ?

However, the benchmark isn't demonstrating memory usage per op, it's demonstrating how fast your network connection is such that it can consume so much data so quickly

The benchmark was meant to show the difference between running the code with and without the option, on the same machine. I added more stats to the benchmark in this commit https://github.com/sbuliarca/franz-go/commit/1cb32cc2387ffd4fc764a6aef56879a4a614f0fb and comparing the code runs (taking one row from the each 15 runs that mostly look the same ) :

As you can see in both cases it reads mostly the same amount of compressed bytes (seems rand.Read is compressed to 6% after all), the number of records/op is the same, the only thing that is different is the no of bytes & allocs /op:

Following the code I don't know if there's much you can do in the lib, as I see it gets straight to the fetch request to kafka: https://github.com/twmb/franz-go/blob/master/pkg/kgo/source.go#L551

twmb commented 1 month ago

I agree, I don't think there's much I can do in the library itself. Due to potential message compression -- and like 4 different knobs to control how much you consume at once -- getting memory usage better may require some tuning / playing with settings. There's some docs about adjusting the number of concurrent fetches, each fetch size, and each fetch partition size -- let me know if those don't suffice. I'm going to close this for now though since I'm on the side of nothing to be done here.