apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.44k stars 3.52k forks source link

[C++] S3FileSystem generates a significant amount of redundant requests #40589

Open orf opened 7 months ago

orf commented 7 months ago

Describe the bug, including details regarding any error messages, version, and platform.

I am using the S3 Filesystem abstractions to process a large Parquet dataset. The job reads data in batches and incrementally produces a new set of Parquet files. During the execution it checkpoints it's work.

The checkpointing code is fairly rudamentary: we incrementally write a Parquet file to the checkpoint/ prefix, and periodically copy it to the output/ directory using fs.move(). We then write a .json file to the checkpoint directory detailing the current progress. Not perfect, but works well enough for our needs.

After implementing this I analyzed the requests to the bucket and found a surprising number of requests where being made. For one job, 520,752 requests where made to the checkpointing/ prefix in S3, whereas only 74,331 requests where made to read + write the dataset. This is a significant discrepency - nearly 7x the number of requests!

The following is a table of requests to the checkpoint prefix broken down by type:

operation total
REST.GET.OBJECT 91871
REST.POST.UPLOADS 84285
REST.PUT.PART 84262
REST.POST.UPLOAD 84261
REST.COPY.OBJECT 42131
REST.COPY.OBJECT_GET 42131
REST.PUT.OBJECT 42129
REST.DELETE.OBJECT 42129
REST.HEAD.OBJECT 7553

I've dug into this and here are the reasons why:

126,388 of the REST.PUT.PART, REST.POST.UPLOADS and REST.POST.UPLOAD requests are made while writing the small .json checkpoint files. Due to this issue, despite being less than 1kb in size and few in number each creation of the .json checkpoint files requires 3 requests to S3.

A further 126,420 requests are also multipart uploads, whilst creating Parquet files. Looking at the statistics, ~50% of the multipart upload requests could have been avoided with an initial 30 megabyte buffer before initiating a multipart upload.

The 42,129 requests with the type REST.PUT.OBJECT are due to the implementation of move() and delete(): it currently attempts to re-create the parent directory after a copy or delete - this is because if there is only a single file in the prefix and we move/delete it, then the prefix will no longer exist. The workaround as implemented is to create a 0-sized object with the name of the prefix, ensuring that it still "exists".

The 7,553 requests with the type REST.HEAD.OBJECT comes in part from from the implementation of DeleteObject, where we make a HeadObject request before deleting a key.

Performance with versioned buckets

While it's noble to attempt to create a proper "filesystem" facade over S3, there are inherent issues with this in terms of cost and performance.

One major thing that worries me is the EnsureparentExists() that is called from DeleteDir, DeleteFile and Move methods. In a versioned bucket, this will repeatedly create empty keys to mimic a directory.

Given a pathalogical case where you do something like this from multiple processes/threads:

sfs = fs.S3FileSystem()

for _ in range(10000):
    path = f"a_bucket/some_directory/{uuid.uuid4()}"
    with sfs.open_output_stream(path) as fd:
        fd.write(b'hi')
    sfs.move(path, f"a_bucket/some_other_directory/{uuid.uuid4()}")

Then we will end up with many tens of thousands of versioned objects with the key some_directory/.

One lesser-known thing about S3 versioned buckets is that list_objects_v2 (and list_objects) calls have to skip over deleted/noncurrent versions of objects. While this is fast, it can get very slow - I've seen prefixes take over a minute to list objects due to the number of noncurrent/deleted objects within those prefixes (i.e listing an outer "directory" takes longer if there are many deleted/noncurrent children within sub-directories). Obviously lifecycle policies can clean these out, but that only happens once a day or so.

You will also almost certainly run into S3 request limits when you repeatedly call PutObject on a single key at high volume - requests limits in aggregate are very high for S3, but for a single key they can be much lower.

Component(s)

C++, Python

felipecrv commented 7 months ago

Then we will end up with many tens of thousands of versioned objects with the key some_directory/.

The solution here may be just checking that the empty dir marker exists instead of re-creating it unconditionally.

orf commented 7 months ago

Then we will end up with many tens of thousands of versioned objects with the key some_directory/.

The solution here may be just checking that the empty dir marker exists instead of re-creating it unconditionally.

That’s no guarantee though, and would generate a lot of redundant requests.

Another idea would be to stop trying to emulate directories on S3. This would need to be feature gated in some way as there may be conditional code that relies on the existence of a directory, but if the feature is enabled (via a flag or a s3 filesystem subclass) then the “directory exists” function always returns True?

I think it’s a fare trade-off if you know you want it?

felipecrv commented 7 months ago

... then the “directory exists” function always returns True?

I suggested something like this months ago while implementing AzureFileSystem but quickly learned that these "simplifications" break a lot of the invariants arrow::FileSystem interface is supposed to guarantee. The reason S3 and other blob stores don't offer FS semantics is efficiency and ability to distribute the system without breaking the filesystem invariants.

The true fix would be to have datasets loaded from an arrow::ObjectStore abstraction that is emulated on local-filesystem instead of going through an arrow::FileSystem abstraction that is emulated on top of object storage systems like S3.