rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.23k stars 884 forks source link

[QST] How can the performance of chunked reading in Parquet be improved? #15376

Open gm3g11 opened 5 months ago

gm3g11 commented 5 months ago

What is your question? image

I am working on a project to improve the performance of reading parquet files using the libcudf library. As shown in the Nsight Systems screenshot, the decompress_page_data event consumes the most time in the read_chunk operation, taking 46.877ms and 41.987ms out of 123.117ms, respectively. I am trying to reduce this decompression time but have found limited material, documentation, or GitHub issues on the subject. Do you have any suggestions? I am also considering using stream technology to accelerate the process but am unsure where to begin. Attached is my code for your reference. Thank you. parquet_chuncked.txt

GregoryKimball commented 5 months ago

Hello @gm3g11, thank you for sharing your experience with the parquet chunked reader. I'm having some trouble reading the txt file you added, maybe it's a problem on my end but it might help to post the code in a hidden markdown block.

Would you be willing to share more about your file with us? It's possible the 40-50 ms decompress is expected, or there could be a performance hotspot. If we have more information about the data file, we can help estimate the expected read throughput.

nvdbaranec commented 5 months ago

Hi @gm3g11. Do you know what compression format this file is using? Also, how large is this file and what settings are you using when creating the chunked reader object?

gm3g11 commented 5 months ago

Hello @gm3g11, thank you for sharing your experience with the parquet chunked reader. I'm having some trouble reading the txt file you added, maybe it's a problem on my end but it might help to post the code in a hidden markdown block.

Would you be willing to share more about your file with us? It's possible the 40-50 ms decompress is expected, or there could be a performance hotspot. If we have more information about the data file, we can help estimate the expected read throughput.

@GregoryKimball, thanks for your reply. Here is the code I am using:

#include <cudf/aggregation.hpp>
#include <cudf/groupby.hpp>
#include <cudf/io/csv.hpp>
#include <cudf/table/table.hpp>

#include <rmm/cuda_device.hpp>
#include <rmm/mr/device/cuda_memory_resource.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <iostream>
#include <string>
#include <vector>

#include <map>
#include <rmm/device_buffer.hpp>

#include <cudf/column/column.hpp>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/lists/count_elements.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/concatenate.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
// #include <cudf/io/text/detail/multibyte_split.hpp>

// #include <nvbench/nvbench.cuh>
int main(int argc, char** argv)
{
    rmm::mr::cuda_memory_resource cuda_mr{};
    rmm::mr::pool_memory_resource<rmm::mr::cuda_memory_resource> pool_mr(&cuda_mr, 0, rmm::percent_of_free_device_memory(50));
    rmm::mr::set_current_device_resource(&pool_mr);

    auto byte_limit = static_cast<std::size_t>(1024 * 1024 * 1024); // 1GB chunk size
    auto read_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info("./dataset/part-00198-tid-3747487300473043810-11eb4400-583b-4bd3-9c6c-a9803c7aeb94-3334-1-c000.snappy.parquet")).build();

    // Assuming you have access to a CUDA stream and memory resource
    auto stream = rmm::cuda_stream_default;
    // rmm::cuda_stream stream;
    auto mr = rmm::mr::get_current_device_resource();
    // rmm::cuda_stream stream;
    // rmm::cuda_stream_view stream_view(stream.value());

    cudf::size_type num_rows_read = 0;

    auto reader   = cudf::io::chunked_parquet_reader(0, read_opts,stream,mr);

      auto start = std::chrono::steady_clock::now();
      do {        
        auto const result = reader.read_chunk();      
        num_rows_read += result.tbl->num_rows();
      } while (reader.has_next());

      auto end = std::chrono::steady_clock::now();
      // total_time += end-start;

    std::cout << "Chunked reading parquet file time elapsed "
      << (std::chrono::duration_cast<std::chrono::milliseconds>(end-start)).count()
      << " ms\n";
    return 0;
}

Here is the link for the parquet file: https://drive.google.com/file/d/1_OSLSiBg9afVu8Io4Vt_3OpDWjN1CDc0/view?usp=sharing

gm3g11 commented 5 months ago

Hi @gm3g11. Do you know what compression format this file is using? Also, how large is this file and what settings are you using when creating the chunked reader object?

The compression format is SNAPPY, and the size of the uncompressed parquet file is: 1.5056GB. I am using unlimited bytes for in and out for the chuked reader object.

etseidl commented 5 months ago

@gm3g11 is this a large corpus you cannot control, or would you be able to change how the files are written? Looking at the data, I think one large takeaway is that compression isn't buying you much except for the first column. In fact, for many data pages compression is making the data larger. For the alert_id column, you could achieve almost the same data reduction by using DELTA_BYTE_ARRAY encoding (each value is a prefix followed by a UUID...DELTA_BYTE_ARRAY would remove that prefix, nearly halving the size). Similarly, the envelope_fields_version and envelope_fields_generation_counter columns would likely benefit from DELTA_BINARY_PACKED encoding. Right now PLAIN+Snappy is reducing 160k to around 99k per page, but given the range of values (all seemingly 6 decimal digits), I'd think using the delta encoding would only require 20 bits per value. This would give you data pages more like 50k in size.

I don't remember how much control over encoding options parquet-mr gives you, but I think if you set compression to none, and enabled V2 data page headers (doing so would cause the most troublesome columns to use delta encoding rather than PLAIN), you might see a good improvement in read performance without increasing the overall file sizes significantly.

gm3g11 commented 5 months ago

@etseidl Thanks for your reply. For this example parquet file, I can do some experiments and try the mentioned encoding scheme. I will let you know if I make some progress.

gm3g11 commented 5 months ago

@etseidl This method is effective. Without compression, we observed an improvement in Parquet reading performance, decreasing from 123 ms to 45 ms, while the file size did not increase significantly, only from 39M to 50M.

In the next step, we want to further accelerate parquet reading with multi-streams and multi-threads. Before we implement it, we extend the parquet file from 240,578 rows into 4,811,560 rows (increase 20 times). There is one thing I want to discuss: my multi-threads method seems work but the multi-streams don't work (the streams are still serial shown as below:) image

I don't find too much material about the streams in the cudf::io::read_parquet API, here is how I am using it in my code, and any comment is welcome:

include <cudf/io/parquet.hpp>

include <cudf/table/table.hpp>

include <rmm/cuda_stream_view.hpp>

include <rmm/mr/device/cuda_memory_resource.hpp>

include <rmm/mr/device/per_device_resource.hpp>

include

include

include

include

include

void read_parquet_chunk(const std::string& file_path, int32_t chunk, int32_t chunk_row_cnt, int32_t total_rows, int32_t chunked_read_num_chunks, std::vector<std::unique_ptr>& results, rmm::cuda_stream& stream) { cudf::io::parquet_reader_options read_options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(file_path)) .skip_rows(chunk chunk_row_cnt) .num_rows((chunk == chunked_read_num_chunks - 1) ? (total_rows - chunk chunk_row_cnt) : chunk_row_cnt) .build(); auto result = cudf::io::read_parquet(read_options, stream); results[chunk] = std::move(result.tbl); }

int main() { const std::string file_path = "./dataset/output_no_compression_4M.parquet"; constexpr int32_t total_rows = 4811560; constexpr int32_t chunked_read_num_chunks = 20; const int32_t chunk_row_cnt = total_rows / chunked_read_num_chunks;

// Initialize CUDA memory resource
rmm::mr::cuda_memory_resource cuda_mr;
rmm::mr::set_current_device_resource(&cuda_mr);

std::vector<rmm::cuda_stream> streams(chunked_read_num_chunks);
std::vector<std::unique_ptr<cudf::table>> results(chunked_read_num_chunks);
std::vector<std::thread> threads;

auto start_time = std::chrono::high_resolution_clock::now();

for (int32_t chunk = 0; chunk < chunked_read_num_chunks; ++chunk) {
    streams[chunk] = rmm::cuda_stream();
    threads.emplace_back(read_parquet_chunk, std::ref(file_path), chunk, chunk_row_cnt, total_rows, chunked_read_num_chunks, std::ref(results), std::ref(streams[chunk]));
}

for (auto& thread : threads) {
    thread.join();
}

// Synchronize all streams
for (auto& stream : streams) {
    stream.synchronize();
}

auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();

cudf::size_type total_rows_read = 0;
for (const auto& result : results) {
    total_rows_read += result->num_rows();
}
std::cout << "Total rows read: " << total_rows_read << std::endl;
std::cout << "Total read time: " << duration << " ms" << std::endl;

return 0;

}

etseidl commented 5 months ago

As I've learned the hard way, simply using separate streams is not necessarily going to cause execution to proceed in parallel on a single GPU. For kernel overlap to occur, you'll need sufficient resources (SMs, shared mem, registers, etc). The parquet kernel is pretty resource heavy, so the chances for overlap occurring are minimal. Also note that internal to libcudf, the various decode kernels execute in separate streams, further constraining resources.

If you want to parallelize further, you'll need to spread execution across multiple GPUs. You could try rolling your own, or use a parallel execution engine like dask or spark.

@nvdbaranec please chime in on anything I got horribly wrong :smile:

gm3g11 commented 5 months ago

@etseidl Thanks for sharing. Based on the document, I see there is a stream option in the cudf::io::read_parquet, and then think we can use multi-streams to accelerate the read_parquet further. Currently, I can only test it on a single GPU.

gm3g11 commented 5 months ago

Hi @GregoryKimball, I came across the GitHub link: "[FEA] Add a Parquet reader benchmark that uses multiple CUDA streams #12700" (https://github.com/rapidsai/cudf/issues/12700), and noticed that you're leading this feature development. I think this feature is related to my case. Could you provide any progresses on the implementation of multiple CUDA streams in the Parquet reader? Thank you.