rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.23k stars 884 forks source link

[FEA] Develop new approach for handling remote I/O #15919

Open vyasr opened 3 months ago

vyasr commented 3 months ago

Is your feature request related to a problem? Please describe. Currently cudf supports reading files from remote sources by reusing the arrow NativeFile interface. Such files can be passed down from Python into libcudf and configured to only read the selected subset of data from the remote resource. This can be vital for the performance of some workflows. However, as part of #15193 we will be removing libarrow as a dependency of cudf and libcudf. This means that we will no longer be able to rely on the NativeFile interface. This is a breaking change for the cudf and libcudf APIs, as well as being a performance hit for some workflows.

Describe the solution you'd like We need to evaluate alternatives that will allow us to maintain or improve upon performance for remote I/O while not depending on libarrow. The arrow removal has numerous ancillary benefits and will be moving forward, so we need to find a way to mitigate that. Ideally we would also want to get a sense of how much the NativeFile-based interfaces are currently used.

vyasr commented 3 months ago

CC @GregoryKimball @rjzamora

rjzamora commented 3 months ago

Thank you for raising @vyasr !

I have spent some time exploring the importance of cudf's NativeFile dependency. In theory, we should be able to achieve the same performance without it. We are not actually using arrow to transfer any remote data at all unless the user specifically opens their file(s) with the pyarrow filesystem API. Instead, we are just using arrow as a translation layer between our python-based fsspec file and something that is recognized by libcudf as a proper data source.

If we were to change the python code to stop relying on NativeFile today, we could probably optimize the existing use_python_file_object=False logic to avoid a significant run-time regression. The only necessary regression (besides losing support for pyarrow filesystems) would be an increase in host-memory usage during partial IO. This is because we would need to pass down a byte range to libcudf that "looks" like an entire file (even if we are only reading a single column, and most of the bytes are left "empty").

Near-term Solution: In order to avoid excessive host-memory usage in the near term, we could probably introduce some kind of "sparse" byte-range data-source to libcudf. It is fairly easy to populate a mapping of known byte ranges efficiently with fsspec. If these known byte ranges could be used to populate a structure that is understood as a file-like object by libcudf, then we can avoid the host-memory issue.

(Possible) Long-term Solution: We roll our own filesystem API at the cpp level and avoid all python-related performance concerns :)

rjzamora commented 2 months ago

@martindurant - As I mentioned in https://github.com/rapidsai/cudf/issues/16004, we'd love to hear your thoughts. A bit more background:

Cudf currently relies on python/fsspec to transfer the necessary data into host memory. For the parquet and csv readers, we support partial IO by wrapping the fsspec file in an arrow PythonFile. For all other IO, we always transfer the entire file into local memory before handing over the bytes to libcudf (c++). Although my original plan was to expand/improve Arrow NativeFile support throughout cudf, it now seems clear that we need to remove the problematic libarrow dependency instead. This means that we are in need of something new at the C++ level.

Some questions for you:

martindurant commented 2 months ago

Sounds like we should talk - there's a lot going on here. I assume that the best case would be if you can move bytes directly from the network interface to GPU without bothering with python bytes (as a reminder, I and others have rust implementations of some of this) but python can continue to be the control layer. Another matter, is that HTTP streams are commonly compressed by the server, which is basically pointless for binary fine formals which have internal compression, and take serious CPU usage before you even get the range of bytes you asked for.

Specific answers to your questions:

What is the fastest way to transfer specific byte ranges

cat_ranges, yes. The set of calls is sent concurrently, so latency is not a problem, but the event loop is running in a single python thread, so no true parallelism. That's not a porblem for almost all use cases, where the stream handling (including any decompression) is fast compared to the network bandwidth. I assume in your case, you will have super-high bandwidth cases too. When called from multiple dask threads in a single process, all the fsspec IO still happens in one dedicated thread.

How difficult and/or effective would it be to build something like Arrow's PythonFile

I mentioned the rust prototype (rfsspec) for this reason. It doesn't have a file-like interface, but the FS implementations are designed to be like fsspec, so it would not be hard. Unlike asyncio, tokio can grab the even loop in many threads and efficiently spawn workers for compute (the prototype does not do this). The question is, do you really want a python file-like object, which is blocking and has internal state? Maybe not. The arrow file objects are like that too, I think. To me, it would make sense to have multiple cursors into memory buffers, especially if you know which buffers you will be needing (like fsspec.parquet).

OTOH, directly calling python with a C++ wrapper (which you describe) is probably very easy to implement, but see the next answer.

What would be the performance delta

this is hard to say. Getting bandwidth limited transfers with fsspec and pure python is certainly possible now, but calling into the python interpreter on every .read(1) in the C++ code would be a terrible idea. I wouldn't be surprised if the thrift parser or the encoding/definition levels decoders do that. Calling into python when you asynchronously need to pull a large block of bytes should be fine.

rjzamora commented 2 months ago

Thanks for the quick response @martindurant !

I assume that the best case would be if you can move bytes directly from the network interface to GPU without bothering with python bytes (as a reminder, I and others have rust implementations of some of this) but python can continue to be the control layer.

Yes we absolutely want to do this in the long run. My preliminary plan/suggestion was to improve remote-IO performance in two general steps:

  1. Optimize the way we are using fsspec in the absence of pyarrow.
    • For example, I assume that we can use cat_ranges to submit a concurrent request for all "known" bytes being used to produce a cudf DataFrame, and then introduce a "sparse" byte-vector concept to read from the known byte ranges once they are in host memory. Pyarrow's PythonFile wrapper was a convenient way to perform this translation, but we always knew it would be best to avoid passing the bytes through Python.
  2. We Introduce a solution entirely outside of python (besides the necessary cudf bindings). For example, it would be wonderful if kvikio (or rfsspec?) could transfer remote bytes directly into GPU memory.

... the event loop is running in a single python thread, so no true parallelism. That's not a problem for almost all use cases, where the stream handling (including any decompression) is fast compared to the network bandwidth. I assume in your case, you will have super-high bandwidth cases too. When called from multiple dask threads in a single process, all the fsspec IO still happens in one dedicated thread.

Okay, thanks for clarifying - This event loop limitation makes sense. Just to clarify, if we were only interested in transferring raw bytes as quickly as possible (no "compute" at all), would there be any benefit to initiating an s3 transfer in parallel? Or does the concurrent cat_ranges call already invoke optimal behavior on s3's end?

I mentioned the rust prototype (rfsspec) for this reason. It doesn't have a file-like interface, but the FS implementations are designed to be like fsspec, so it would not be hard.

It's really great that you have already been looking into this - I am very interested in exploring rfsspec to see how the general cat_ranges performance compares to fsspec proper for logic similar to fsspec.parquet. Do you think it makes sense for me to test/explore this?

The question is, do you really want a python file-like object, which is blocking and has internal state? Maybe not. To me, it would make sense to have multiple cursors into memory buffers, especially if you know which buffers you will be needing (like fsspec.parquet)... but calling into the python interpreter on every .read(1) in the C++ code would be a terrible idea. I wouldn't be surprised if the thrift parser or the encoding/definition levels decoders do that. Calling into python when you asynchronously need to pull a large block of bytes should be fine.

I completely agree with you that a python-like file is not the ideal solution here. I am only interested in a PythonFile replacement as a short-term workaround if something like this is "easy".

martindurant commented 2 months ago

For example, I assume that we can use cat_ranges to submit a concurrent request for all "known" bytes being used to produce a cudf DataFrame, and then introduce a "sparse" byte-vector concept to read from the known byte ranges once they are in host memory.

Absolutely, and this would be a very simple construct. I could write it in rust in no time, but not C++ :) For python, of course we already have this.

it would be wonderful if kvikio (or rfsspec?) could transfer remote bytes directly into GPU memory.

My code of course only uses pretty high-level things, so no direct talking to devices. I have no idea what it would take, but I'm sure you know people who do. I have considered whether it might be useful to have a full rust-based general fsspec-like, with python bindings, but not exclusively. I don't have the spare effort to do that really. Of course, python encourages more backends, since they are much easier to write, and tricks like what kerchunk achieves - I would not want to exclude the potential to do that.

I am very interested in exploring rfsspec to see how the general cat_ranges performance compares to fsspec proper for logic similar to fsspec.parquet. Do you think it makes sense for me to test/explore this?

I did some light benchmarking of zarr workflows at the time (no ranges, whole files only), and found that there was no drastic difference, at most 20% improvement occasionally, sometimes none at all. Probably, asyncio is not the bottleneck, the overhead is pretty small. Having said that, I never finished implementing https://github.com/martindurant/rfsspec/blob/main/src/io.rs , which would bring a file-like experience without copying or making bytes objects. cramjam has similar structures also, for inspiration.

martindurant commented 2 months ago

Just to clarify, if we were only interested in transferring raw bytes as quickly as possible (no "compute" at all), would there be any benefit to initiating an s3 transfer in parallel? Or does the concurrent cat_ranges call already invoke optimal behavior on s3's end?

I don't know if, in theory, it's possible to have multiple threads talking to the network interface at once. Certainly, parallel copies for host to device memory are possible. None of this is touched by the high-level python code, of course. I think in python's case, multiple event loops in multiple threads would just end up costing overhead (because of the GIL, or maybe even without it). So yes, I think cat_ranges can be near optimal. Other network parameters probably come way sooner, like https://github.com/fsspec/s3fs/issues/873 .

martindurant commented 2 months ago

FYI: auto_decompress=False to the aiohttp client would prevent CPU decompression, so that it can be moved to the GPU for smaller transfers and potential speedup. Actually, many have switched to httpx as the client over aiohttp; I don't know if it offers any different performance characteristics.