apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.45k stars 3.52k forks source link

[C++] Mechanism for throttling remote filesystems to avoid rate limiting #34892

Open amoeba opened 1 year ago

amoeba commented 1 year ago

Describe the enhancement requested

In the current implementation of at least the S3 filesystem, it's possible to hit AWS' rate limiting and this makes it hard to use a function like pyarrow.parquet.write_dataset to write highly partitioned datasets to S3. When the rate limit is hit during writing, the user receives an HTTP 503 with a "SLOW_DOWN" error and the request needs to be retried. I would guess any other cloud storage provider (ie GCS) would behave similarly though I haven't tested.

There are a few things that could be done to address this issue:

Component(s)

C++

ianmcook commented 1 year ago

https://github.com/apache/arrow/issues/20483 is a related issue

mapleFU commented 1 year ago

Would it like https://github.com/apache/arrow-rs/blob/master/object_store/src/throttle.rs or RocksDB Write Stall?

westonpace commented 1 year ago

It could be based on https://github.com/apache/arrow-rs/blob/master/object_store/src/throttle.rs

We have something kind of similar already internally with https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/slow.h and SlowFileSystem (in filesystem.h)

However, for S3, it seems we already have logic for retries (I was not aware of this until recently). I think a first question is to understand why this doesn't work. By default an S3 filesystem should (I think) retry every 200ms up to 6 seconds.

westonpace commented 1 year ago

https://github.com/apache/arrow/blob/main/cpp/src/arrow/filesystem/s3fs.h#L166-L168

mapleFU commented 1 year ago

Seems Retry is apart from SlowFileSystem because it would be used not only under testing(but logic is similiar), it can be used like decorated mode? And Retry with backoff might be implemented in same layer?

mapleFU commented 1 year ago

After go through the code, I think throttling and retry are two related but not part of the work:

  1. throttling: Limit the request rate and avoid burst write on a already over-quota stream
  2. Retry: allow user to config a sleep and retry-backoff strategy

Both of them need to detect that whether the status is "needRetry"

StuartHadfield commented 1 year ago

Chiming in here as I'm a pyarrow user and having immense difficulty with this. Food for thought:

Imagine a scenario where you have nearly continuous influx of data, which you need to render into parquet and store on S3. A backoff strategy works fine and well for a single write, but when you have loads of data incoming, if you get rate limited, and you backoff, you risk falling behind to a point where it's very difficult to catch up.

This is, of course, hypothetical, but it illustrates that whilst throttling and retry with backoff would be very useful for 90% of use cases (and I would certainly appreciate them, I just do not possess the programming skill to implement them here :( ), there are some niche circumstances where we may need to consider batching writes more efficiently.

mapleFU commented 1 year ago

@StuartHadfield Seems that's an conflict between provide maximum bandwidth and provide stable servicing. Maybe user can config the argument or fs themselves? Or use some dynamic config to controlling this.

westonpace commented 1 year ago

Imagine a scenario where you have nearly continuous influx of data, which you need to render into parquet and store on S3. A backoff strategy works fine and well for a single write, but when you have loads of data incoming, if you get rate limited, and you backoff, you risk falling behind to a point where it's very difficult to catch up.

This is, of course, hypothetical, but it illustrates that whilst throttling and retry with backoff would be very useful for 90% of use cases (and I would certainly appreciate them, I just do not possess the programming skill to implement them here :( ), there are some niche circumstances where we may need to consider batching writes more efficiently.

The dataset writer itself issues one "Write" call per row group. You can batch those using the min_rows_per_group configuration of the call.

The S3 filesystem itself will batch incoming writes until it has accumulated 5MBs of data. This is controlled by a constant kMinimumPartUpload. Given that S3 is supposedly providing 5,500 requests per second that would seem to imply a limit of 27.5GBps which I assume is more than enough.

It's also possible, if you have many partitions, and a low number of max open files, that many small parquet files are being created. So you might just check to see if that is happening (and increase the allowed # of max open files if it is).

Again, I think more investigation is needed. How many writes per second are actually being issued?