apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

fix: Avoid to call import and export Arrow array for native execution #1055

Open kazuyukitanimura opened 2 weeks ago

kazuyukitanimura commented 2 weeks ago

Which issue does this PR close?

Rationale for this change

Performance improvement

What changes are included in this PR?

This PR has changes to avoid to call import and export Arrow array for native execution

How are these changes tested?

Exisiting tests

kazuyukitanimura commented 2 weeks ago

q27 Before Screenshot 2024-11-04 at 5 40 56 PM

After Screenshot 2024-11-04 at 5 41 15 PM

andygrove commented 2 weeks ago

I tried running benchmarks from this PR but I am running into an error:

ExecutorLostFailure: The executor with id 21 exited with exit code 134(unexpected).

I also see this glib error in the executor log:

corrupted size vs. prev_size
codecov-commenter commented 1 week ago

Codecov Report

Attention: Patch coverage is 36.06557% with 39 lines in your changes missing coverage. Please review.

Project coverage is 34.32%. Comparing base (845b654) to head (ec72117). Report is 16 commits behind head on main.

Files with missing lines Patch % Lines
...ava/org/apache/comet/vector/CometNativeVector.java 0.00% 15 Missing :warning:
...in/java/org/apache/comet/parquet/ColumnReader.java 0.00% 11 Missing :warning:
...ain/scala/org/apache/comet/vector/NativeUtil.scala 0.00% 8 Missing :warning:
...ain/java/org/apache/comet/parquet/BatchReader.java 0.00% 3 Missing :warning:
...ava/org/apache/comet/parquet/LazyColumnReader.java 0.00% 1 Missing :warning:
.../src/main/java/org/apache/comet/parquet/Utils.java 0.00% 1 Missing :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #1055 +/- ## ============================================ - Coverage 34.46% 34.32% -0.15% - Complexity 888 893 +5 ============================================ Files 113 114 +1 Lines 43580 42916 -664 Branches 9658 9339 -319 ============================================ - Hits 15021 14732 -289 + Misses 25507 25336 -171 + Partials 3052 2848 -204 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

andygrove commented 1 week ago

@kazuyukitanimura. I can run the benchmarks now, but I did not see any difference in performance compared to the main branch. Are you still seeing a performance benefit after the recent changes?

kazuyukitanimura commented 1 week ago

Thank you @andygrove That's odd. My local queries are showing clear improvement. Just checking, are you using iceberg? I haven't done the DSv2 yet. Right now, pure parquet with DSv1 gets the benefit of this PR I will try to run more queries as well.

kazuyukitanimura commented 1 week ago

5 iterations before

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q27: Comet (Scan, Exec)                            4218           4320         116         68.8          14.5       1.0X

After

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q27: Comet (Scan, Exec)                            3534           3784         191         82.1          12.2       1.0X
kazuyukitanimura commented 1 week ago

Before

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q39a: Comet (Scan, Exec)                          14077          14413         252         28.4          35.2       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q44: Comet (Scan, Exec)                            2160           2235          73        133.4           7.5       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q68: Comet (Scan, Exec)                            5023           5091          61         58.0          17.3       1.0

After

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q39a: Comet (Scan, Exec)                          12968          13770        1085         30.8          32.5       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q44: Comet (Scan, Exec)                            1968           2106         129        146.5           6.8       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q68: Comet (Scan, Exec)                            4232           4530         195         68.8          14.5       1.0
andygrove commented 1 week ago

I see these results from 3 runs of q39 (both a+b) with 1TB input:

main: 59.3s/57.0s/57.1s this PR: 58.2s/58.9s/57.9s

I am using these configs:

    --conf spark.executor.instances=4 \
    --conf spark.executor.cores=6 \
    --conf spark.executor.memory=8g \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=32g \
kazuyukitanimura commented 1 week ago

Thanks @andygrove I tried with offHeap but I still see the advantage. Wondering if any of the following configs make difference. I use 100G input

spark.comet.regexp.allowIncompatible=true
spark.sql.shuffle.partitions=4
spark.driver.memory=3g
spark.executor.memory=3g
spark.sql.autoBroadcastJoinThreshold=20971520 // (20 * 1024 * 1024)
spark.sql.crossJoin.enabled=true
parquet.enable.dictionary=true
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q39a: Comet (Scan, Exec)                          13732          14177         756         29.1          34.4       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q44: Comet (Scan, Exec)                            1942           2029          69        148.4           6.7       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q68: Comet (Scan, Exec)                            4544           4823         211         64.1          15.6       1.0X
viirya commented 1 week ago

I guess the diff is only observable on very small scale (ms). Once the query time is second/minute/hour level, the diff is insignificant.

kazuyukitanimura commented 1 week ago

Thank you @andygrove @viirya I addressed memory issues and added DSv2 support (except Iceberg). This is ready for review again.

I also run with 1TB by myself. I still see 10% ish speed up that is the order of seconds not ms. I think there are some environmental differences between @andygrove 's and mine.

Before (1TB)

TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q39a: Comet (Scan, Exec)                          27979          33349         580         28.0          35.7       1.0X

TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q44: Comet (Scan, Exec)                           24877          25216         271        115.8           8.6       1.0X

After (1TB)

TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q39a: Comet (Scan, Exec)                          27281          28714         111         28.7          34.8       1.0X

TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q44: Comet (Scan, Exec)                           23534          23880         388        122.4           8.2       1.0X
viirya commented 1 week ago

It looks like less than 5%?

27979 - 27281 = 698 ~= 2% 24877 - 23534 = 1343 ~= 5%

So I believe that the diff is insignificant for longer query time.

kazuyukitanimura commented 1 week ago

Thanks @viirya

It looks like less than 5%? 27979 - 27281 = 698 ~= 2% 24877 - 23534 = 1343 ~= 5%

I used Ave Time (5 iterations) when I mentioned 10% ish 100G q39a (14413-13770)/14413 ~= 4.5% 1T q39a (33349-28714)/33349 ~= 14% The performance gain can easily swing between 2%-15% due to the noise

So I believe that the diff is insignificant for longer query time.

The performance gain is proportional to the input size by design because it saves the overhead for all input arrow vectors. If you mean longer query time by deeper query plan, then yes, the performance gain gets less effective as the benefit is one-time per data read and query procesing time takes more portions in the entire runtime. But there are many use cases I think for shallow query plans as well and we can still save $$$?

viirya commented 1 week ago

1T q39a (33349-28714)/33349 ~= 14%

33349 obviously looks like it is affected by some noises.

q44 avg diff is also ~ 5%.

The performance gain is proportional to the input size by design because it saves the overhead for all input arrow vectors.

I mean it is less significant with longer query time. For queries running many minutes or hours, it has no difference. If this is a trivial change, it will be good. But as I took a quick look, the change is not small and doesn't look like in good design to me.

I doubt that if it is worth.

kazuyukitanimura commented 1 week ago

@viirya Let me try to convince one more time.

For queries running many minutes or hours, it has no difference.

If the query time is long because the data size, this PR still helps (5% at least?). If the query time is long because the query itself is complex, this PR has less value.

the change is not small

The latest change is pretty small after following your change on Arrow spec memory model. There are only 3 main changes

  1. avoid importing: common/src/main/java/org/apache/comet/parquet/ColumnReader.java
  2. avoid exporting: common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
  3. type handling fix: native/core/src/execution/utils.rs

The rest of changes are only for passing the information of the new mode is used.

doesn't look like in good design to me.

Do you have any recommendations here? What if I add a feature flag to enable/disable this new code flow. The latest change is fully backward compatible. We can easily enable/disable with a single flag to manipulate hasNativeOperations boolean.