rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.31k stars 886 forks source link

[FEA] Add synchronization for IO between `read_parquet` calls on different threads #16936

Open GregoryKimball opened 3 days ago

GregoryKimball commented 3 days ago

Is your feature request related to a problem? Please describe.

When launching a pool of threads, each calling read_parquet, we often find poor pipelining for the first read. We observe that each thread submits IO to the scheduler, and since each thread has the same priority, the scheduler completes the IO for all the threads at about the same time. This means that compute on the data cannot begin until all threads finish IO.

Image

In this profile, we have 4 threads reading 3 parquet files each. You can see that the first read does not pipeline and instead does all the copying, followed by all the compute. The second read shows better copy-compute pipelining because the threads are staggered when the second read begins. For this example it only takes about 10-20 ms of jitter in the start times to achieve good pipelining.

Describe the solution you'd like I would like to enable simple communication between read_parquet calls on the same process. Only one thread should be able to do IO at a time. This way the first thread will complete its IO, launch its decompression kernel, and then the second thread can begin IO. This way the second thread's copying will overlap with the first thread's compute, and so on for the rest of the threads.

To implement this, we could introduce an "IO lock" using std::conditional_variable that let's a thread set a bit before starting IO and then unset the bit when IO completes.

Describe alternatives you've considered We could add sleeps for each thread to help stagger the copies and enable better pipelining.

Additional context Please note, we can still do a kvikIO read with multiple threads (e.g. KVIKIO_NTHREADS=8) for each of the main read_parquet threads. We just want each read_parquet call to complete its IO and launch compute before other the read_parquet calls begin their IO.

For a disk read it is probably also better to do all the sequential reads together for a single file, rather than submitting 4 MB reads from different files back-to-back.

vuule commented 3 days ago

we can still do a kvikIO read with multiple threads (e.g. KVIKIO_NTHREADS=8) for each of the main read_parquet threads

I think the thread pool is per-process, so we'd have 8 threads shared between our read_parquet threads. Which makes sense to me, as the optimal IO parallelism is presumable determined by the storage.