Aiven-Open / tiered-storage-for-apache-kafka

RemoteStorageManager for Apache Kafka® Tiered Storage
Apache License 2.0
96 stars 20 forks source link

Does GCS fetching result in a lot of extra bandwidth? #616

Open stanislavkozlovski opened 1 month ago

stanislavkozlovski commented 1 month ago

I'm reading through the code to try and calculate how many remote store calls one would expect from a normal Kafka deployment. As far as I trace the read path, I notice that for each chunk we call a GET separately. This get is for the particular chunk - i.e the byte range. While the S3 and Azure implementations use the particular API's range fetching capabilities - the GCS one doesn't (at least not obviously).

Reading further into it, I see that the way to do range fetches in the Java API is through the storage.reader() API, and I notice that Blob.reader() does in fact just do that.

So I guess this answer my question with a No. We only fetch the bytes of the range we request.

Additionally, if I may ask:

  1. Is it a correct understanding that we do a GET for each chunk?

If yes, have we considered GET-ting the whole object (or at least a larger part) and then caching the chunks (at least when a cache is configured?). Since consumers/brokers typically read linearly when reading historically, you'd expect that if you fetch a 4MiB chunk of a segment, you're probably gonna fetch the full log segment anyway

  1. We PUT the whole segment log in multipart chunks via s3.multipart.upload.part.size / gcs.resumable.upload.chunk.size / azure.upload.block.size but GET it in chunk.size.

Is there a reason these weren't made to be more consistent with one another? At least with the default?

stanislavkozlovski commented 1 month ago

A third question, perhaps for @ivanyu, the code says:

 private Blob getBlob(final String key) throws KeyNotFoundException {
        // Unfortunately, it seems Google will do two a separate (HEAD-like) call to get blob metadata.
        // Since the blobs are immutable in tiered storage, we can consider caching them locally
        // to avoid the extra round trip.
        final Blob blob = storage.get(this.bucketName, key);

But EXTENSIVELY searching through Google, reading through the documentation, reading the GCS Java client code - I don't find any reference of this. Is it still true?

stanislavkozlovski commented 1 month ago

Fourth question: The default size for a segment is 1 GiB, so the way this behaves seems to be:

  1. Kafka awaits for the active segment to get rolled (1GiB) and LSO to pass.
  2. Kafka starts uploading to S3 via this plugin - 1 multipart size at a time (5 MiB), serially in the same thread.
  3. 1 GiB / 5 MiB is 205 requests. If each have 200ms RTT, that's 41 seconds to fully upload. (per partition)

Is there any reason the upload size isn't larger by default?

AFAICT a big reason for multipart upload is to make uploads parallelizable (and retriable) - but since we don't leverage that here, it makes sense to chunk larger portions?

stanislavkozlovski commented 3 weeks ago

@jlprat @giuseppelillo thanks for merging my docs PR. Is it possible to get some attention on this? Just looking to have a discussion, not necessarily an outcome

jeqo commented 2 weeks ago

@stanislavkozlovski thanks for starting this discussion! Let me try to answer some of the questions.

So I guess this answer my question with a No. We only fetch the bytes of the range we request.

Correct, we only fetch the chunk range.

Is it a correct understanding that we do a GET for each chunk? If yes, have we considered GET-ting the whole object (or at least a larger part) and then caching the chunks (at least when a cache is configured?). Since consumers/brokers typically read linearly when reading historically, you'd expect that if you fetch a 4MiB chunk of a segment, you're probably gonna fetch the full log segment anyway

Correct. This is part of the pre-fetching functionality on the chunk cache: https://github.com/Aiven-Open/tiered-storage-for-apache-kafka?tab=readme-ov-file#prefetching See: https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/blob/main/docs/configs.rst#diskchunkcacheconfig prefetch.max.size

With PUT the whole segment log in multipart chunks via s3.multipart.upload.part.size / gcs.resumable.upload.chunk.size / azure.upload.block.size but GET it in chunk.size. Is there a reason these weren't made to be more consistent with one another? At least with the default?

We considered to add something like fetch.part.size or similar at some point, but decided that it wasn't worth to introduce another config as chunk size + prefetching size was enough: the chunk is the minimum size to cache, and we wanted to reduce the time to first byte. Introducing a larger part size would have increase latency as we cache the value before returning the stream to the consumer (i.e. storing a larger part on disk would be slower).

jeqo commented 2 weeks ago

Is there any reason the upload size isn't larger by default? AFAICT a big reason for multipart upload is to make uploads parallelizable (and retriable) - but since we don't leverage that here, it makes sense to chunk larger portions?

Agree. We were considering to make these uploads parallel at some point, but decided against it as it would have use more IO/CPU. Instead we introduced a rate limiter to make the upload predictable.

About the size, we defaulted to the minimum to start with but I do agree with your analysis, and a higher default should be proposed. If you have a better value in mind, could you propose a PR? Would be much appreciated.

stanislavkozlovski commented 1 week ago

Correct. This is part of the pre-fetching functionality on the chunk cache

Yeah, what I meant by the question was whether that object get is many different get() calls, or one big one. I was suggesting we just call one GET to get all the data (saving remote calls)

--

Makes sense on the get size!

--

About the size, we defaulted to the minimum to start with but I do agree with your analysis, and a higher default should be proposed. If you have a better value in mind, could you propose a PR? Would be much appreciated.

No, I don't have any production-tested value. I would probably 10x it at a minimum, but without some tests the proposal doesn't mean much

Regarding the parallelization and predictable throughput, you could still 10x the value and keep the throughput quota. I reckon that would be little risk (in theory)

stanislavkozlovski commented 1 week ago
image

Just looking at the 5mib-5gib range, it seems like it's standard practice to have larger chunk sizes. This isn't a high priority issue, but increasing size 10x or so would cut down on Kafka's PUT request cost 10x too.

It's not too extreme - the costs are small nevertheless. But still - seems like an easy gain. As an example, a 50 MB/s AWS cluster would average 9.6 PUT/s under this implementation and cost $1,520/yr for those puts. Changing it would reduce the cost to $152/yr. That being said, the 1.5k cost is just 1% of the total cluster cost, so you'd at best save 0.9% of the total Kafka cost - not a huge amount. Still seems like an easy improvement!

This same example in Azure would run you from 1.96k/yr to $196/yr and that's now a significant 10% savings off of the total cluster cost.

jeqo commented 6 days ago

Thanks @stanislavkozlovski ! I have added a note to our docs to suggest tuning this config: https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/pull/626