twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.8+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.84k stars 188 forks source link

high number of allocations in `kgo.recordToRecord` function #823

Open ortuman opened 1 month ago

ortuman commented 1 month ago

As a result of the large volume of records generated, we’ve observed in one of our consumers a high number of allocations originated in the kgo.recordToRecord function (~65% of total allocations according to the attached screenshot, concretely in this line). This results in a performance degradation caused by GC overhead.

Screenshot 2024-09-19 at 09 59 14

Has the possibility of reusing the generated records through a pool ever been considered in order to minimize this effect? Perhaps it could be offered as an optional parameter? I could propose a PR for your review if you consider it.

ortuman commented 1 month ago

Alternatively, perhaps the recordToRecord function may simply return a kgo.Record object by value instead of by reference, although I'm not sure if this might result in a breaking change.

ortuman commented 1 month ago

In https://github.com/twmb/franz-go/pull/827 I've proposed a possible solution to the problem.

twmb commented 1 month ago

I see you took your proposal a good bit further in https://github.com/grafana/franz-go/pull/3. I used to have a sync.Pool for records internally, but removed it due to due to it being completely ineffective. Specifically with my prior usage internally, I'd pull thousands of records out of the pool (allocating them all) and then put them back in at a different independent time. GC would run, all records would be collected anyway, and the pool just wasn't useful.

PR 3 there I think is sound but is tricky to follow. I think it's trying to solve a few goals:

  1. Reuse the temporary byte slice that is created while decompressing
  2. Reuse the temporary []kmsg.Record slice that is created while processing a fetch response
  3. Reuse the end-user *kgo.Record

I think the implementation does the job, but the code is pretty sketchy to analyze. As well, I think it's not working 100% as intended in the case where you are consuming uncompressed data. Currently, if consuming uncompressed data, nothing is being acquired from the DecompressBufferPool, but reuse still releases back into the pool. If consuming compressed and uncompressed data, this byte slice that was never acquired originally (via non-compressed batches) is being acquired for decompressing batches, and the slice is persisting randomly around. If this is what's actually happening (i.e. my reading comprehension is working right now), it's fine, but certainly not the intent.

I think some of the buffers could be put back into the pool a bit more aggressively? e.g., I don't know why rcRawRecordsBuffer is on the kgo.Record field. The point (AFAICT) is to allow the []kmsg.Record slice to be reused -- which I think can be done immediately after a partition is processed (rather than holding on until the end-user processes the kgo.Record).

I'm open to a caching API, but I can't think of a great one yet. I don't think it's something that should be added always for everybody, especially not via globals.