apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

feat: Implement native version of ColumnarToRow #1034

Closed parthchandra closed 2 weeks ago

parthchandra commented 4 weeks ago

Which issue does this PR close?

Closes #708

Rationale for this change

Columnar to row operation is really slow and this attempts to speed it up.

What changes are included in this PR?

Implements a native columnar to row operation for integer and floating point types. Variable length types can be added later. The implementation allocates an off-heap buffer large enough to hold an entire batch of UnsafeRows, passes it in to the native layer which converts the Arrow vectors into UnsafeRows and puts them into the allocated buffer. The JVM code then simply points its UnsafeRows to the underlying buffer.
The native conversion copies the data from the arrow vector into the UnsafeRows. This is currently the performance bottleneck.

How are these changes tested?

Additional unit test.

parthchandra commented 4 weeks ago

Initial performance numbers for this implementation are not looking good. There are two areas where things are getting slower compared to Spark 1 . No WholestageCodegen - The iteration over rows alone is adding extra cost to the implementation

  1. We incur an additional cost of creating UnsafeRows in native which is more costly than the calls made by Spark to extract values out of Arrow vector. Here's the initial benchmark run for just integer types -
    
    Running benchmark: ColumnarToRowExec
    Running case: Spark Columnar To Row - integer
    Stopped after 34 iterations, 2029 ms
    Running case: Comet Columnar To Row - integer
    Stopped after 24 iterations, 2081 ms

OpenJDK 64-Bit Server VM 11.0.19+7-LTS on Mac OS X 14.6 Apple M3 Max ColumnarToRowExec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative

Spark Columnar To Row - integer 40 60 14 262.1 3.8 1.0X Comet Columnar To Row - integer 53 87 32 198.2 5.0 0.8X

parthchandra commented 4 weeks ago

@mbutrovich @andygrove any thoughts?

parthchandra commented 2 weeks ago

@andygrove @Dandandan thanks for your comments. I've tried this now with a single datatype and no datatype checks and I see the same results. I'm going to close this PR until I can think of some better way to do this.

andygrove commented 2 weeks ago

@parthchandra I experimented with iterating over columns first and then rows. I also modified the code to only create SparkUnsafeRow once instead of once per row. I now see similar performance between Spark and native.

OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.8.0-47-generic
AMD Ryzen 9 7950X3D 16-Core Processor
ColumnarToRowExec:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark Columnar To Row - integer                      57             62           4        183.9           5.4       1.0X
Comet Columnar To Row - integer                      57             73          17        183.8           5.4       1.0X

Code:

                ArrowDataType::Int32 => {
                    let array = arr
                        .as_any()
                        .downcast_ref::<Int32Array>()
                        .expect("Error downcasting to Int32");

                    let mut row = SparkUnsafeRow::new(&schema);
                    let mut row_start_addr: usize = addr;

                    if array.null_count() == 0 {
                        for i in 0..num_rows {
                            let row_size =
                                SparkUnsafeRow::get_row_bitset_width(schema.len()) + 8 * num_cols;
                            unsafe {
                                row.point_to_slice(std::slice::from_raw_parts(
                                    row_start_addr as *const u8,
                                    row_size,
                                ));
                            }
                            row.set_int(j, array.value(i));
                            row_start_addr += row_size;
                        }

                    } else {
                        for i in 0..num_rows {
                            let row_size =
                                SparkUnsafeRow::get_row_bitset_width(schema.len()) + 8 * num_cols;
                            unsafe {
                                row.point_to_slice(std::slice::from_raw_parts(
                                    row_start_addr as *const u8,
                                    row_size,
                                ));
                            }
                            if array.is_null(i) {
                                row.set_null_at(j);
                            } else {
                                row.set_int(j, array.value(i));
                            }
                            row_start_addr += row_size;
                        }
                    }
                }
parthchandra commented 2 weeks ago

I considered going column then rows. This gets a little complicated with variable length as now we have to keep the offset for each row at which the variable length part can be written. This part of the code is no longer correct with variable length types -

let row_size = SparkUnsafeRow::get_row_bitset_width(schema.len()) + 8 * num_cols;
parthchandra commented 2 weeks ago

@andygrove I tried with your change and I get -

Running benchmark: ColumnarToRowExec
  Running case: Spark Columnar To Row - integer
  Stopped after 35 iterations, 2037 ms
  Running case: Comet Columnar To Row - integer
  Stopped after 15 iterations, 2022 ms

OpenJDK 64-Bit Server VM 11.0.19+7-LTS on Mac OS X 14.6
Apple M3 Max
ColumnarToRowExec:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark Columnar To Row - integer                      42             58          13        249.5           4.0       1.0X
Comet Columnar To Row - integer                      52            135          36        200.9           5.0       0.8X

So not really any better. I also tried with another change so that I precompute row lengths and rows start addresses to accomodate variable length fields and look up the row lengths as I iterate over the rows for each column. That slowed down to 0.7x of Spark. I don't think this is going to work, so I'm leaving this closed.