Open wgip opened 3 years ago
Further finding:
WSCG (joins & aggregates) extend BlockingOperatorWithCodegen
which indicates parts where all data needs to be loaded in before anything can be done. We could rewrite the codegen parts here, still benefit from WSCG performance, and pass it to VE because all the data is loaded in at once anyway.
This approach could be very fruitful because we'd be benefitting from Spark's strengths in WSCG and also from VE's potential strength in Sort / HashAgg. On top of it, it seems the data would already be in unsafe portions of memory, thereby meaning we don't necessarily need to convert it.
Several interesting things here: It seems that Spark Rapids by default will not try to optimize floating point computations, as documentation explains:
Spark assumes that all operations produce the exact same result each time. This is not true for some floating point aggregations, which can produce slightly different results on the GPU as the aggregation is done in parallel. This can enable those operations if you know the query is only computing it once.
So, we need to actually enable that by passing configurationspark.rapids.sql.variableFloatAgg.enabled
set to true
. Without that setting for a simple floating point sum of a single column, spark plan looks like this :
And the performance is very similar to classic CPU spark. When we set the above config to true, the plan changes to :
And the performance is over two times better than without the configuration set. So, we should probably make sure that we are passing this config in our Python tests.
The other finding is shown on image below:
It shows that while spark rapids performs very well when we enable full optimization it is still ~~50% worse than spark CPU with wholestage codegen enabled. So, this basically means that it will be very hard or even impossible to beat spark in simple aggregations that are easy to optimize, but we can still win in more complex cases when its harder to optimize by code generation.
Additional finding is that in case of the same dataset in CSV file, the results are much different. It seems that for CSV wholestage codegen wasn't able to optimize too much:
[info] Benchmark Mode Cnt Score Error Units
[info] VEJMHBenchmarkCSV.test2JVMRunNoWholestageCodegen ss 5 323,441 ± 27,143 s/op
[info] VEJMHBenchmarkCSV.test2JVMRunWithWholestageCodegen ss 5 312,278 ± 31,970 s/op
[info] VEJMHBenchmarkCSV.testRapidsRun ss 5 154,353 ± 10,068 s/op
Rapids shining here as it gave us 2x performance increase, but another interesting thing is that for CSV only part of the plan gets replaced by Gpu
version even with varliableFloatAgg
enabled. I am not quite sure why this is the case.
Not sure why this is the case, perhaps it's due to some config that limits the data size ?
Additional finding is that in case of the same dataset in CSV file, the results are much different. It seems that for CSV wholestage codegen wasn't able to optimize too much:
[info] Benchmark Mode Cnt Score Error Units [info] VEJMHBenchmarkCSV.test2JVMRunNoWholestageCodegen ss 5 323,441 ± 27,143 s/op [info] VEJMHBenchmarkCSV.test2JVMRunWithWholestageCodegen ss 5 312,278 ± 31,970 s/op [info] VEJMHBenchmarkCSV.testRapidsRun ss 5 154,353 ± 10,068 s/op
Rapids shining here as it gave us 2x performance increase, but another interesting thing is that for CSV only part of the plan gets replaced by
Gpu
version even withvarliableFloatAgg
enabled. I am not quite sure why this is the case.Not sure why this is the case, perhaps it's due to some config that limits the data size ?
I use--conf spark.rapids.sql.explain=ALL
to see explanation about our queries
There are some expressions that rapids not supported
Did a lot of performance benchmarking & analysis with @Wosin
Findings:
Conclusion:
Things like Rapids load their data directly into the GPU; if we try to integrate with the JVM we may not be able to achieve much performance because we can't optimize across JIT boundaries. And if we try to call the VE within JIT boundaries then JIT will get eliminated/become slower.
So, we need to think about loading data into the VE efficiently. Even without Arrow, because we are going outside of WSCG, we cannot beat it while relying on previous JVM steps.
Thinking: given that VE-Parquet might take some time, we should look at how to read the Parquet into Arrow fully using native code, so that then it can be copied over directly to the VE, and processed there. This way, we stand a chance against WSCG.
4s is the raw memory bandwidth, and Spark does it in 7s, so the overhead is very small. Perhaps 2 columns should give us a more interesting benchmark because Spark's WSCG approach could be benefitting heaps from this query plan.
Here we see Java's JIT in action:
Next steps:
Prediction is that joins / aggregates should do better where Java's JIT is weak. Eg when it's building up a map there's definitely a lot of flux.
Thinking: