twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.6k stars 158 forks source link

Is there a way to limit consumer PollRecords buffered fetches by size? #757

Closed genzgd closed 4 weeks ago

genzgd commented 4 weeks ago

Our application consumes data in many different formats, and we're struggling to limit memory consumption in one particular scenario. The producer has a large number of topics with very large records (~200k each), but we don't know the approximate record size ahead of time. Accordingly, if we set a limit of say, 20k records in PollRecords (which we can handle just fine if they are small), we can in theory end up with a total of 4GB (20k * 200k) buffered (even more if the data was originally compressed) in the client by the time PollRecords completes.

Ideally we'd like to limit those buffered records by size in some way without knowing ahead of time how large they are expected to be. Right now it looks like we can control the time (by sending a context with timeout) or the number of records (using the maxPollRecords argument), but not the actual size. We were assuming that a combination of MaxConcurrentFetches and MaxFetchSize would do the trick, but the total number of buffered fetches can apparently exceed MaxConcurrent Fetches, so we ended up with far more records than we expected based on those two values.

If there's not another way to do this we can manipulate the context timeout and the maxPollRecords values dynamically, but it would be nice if there was some combination of options that would let us limit size or at least the number of total fetches that get returned

genzgd commented 4 weeks ago

Okay, this is looking like our bug where we were using the wrong MaxFetchSize value. MaxConcurrentFetches + MaxFetchSize seems to work correctly.