apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
829 stars 164 forks source link

[EPIC] Improve shuffle performance #1123

Open andygrove opened 5 days ago

andygrove commented 5 days ago

This epic is for improving shuffle / ScanExec performance.

Issues

Context

I have been comparing Comet and Ballista performance for TPC-H q3. Both execute similar native plans. I am using the comet-parquet-exec branch which uses DataFusion's ParquetExec.

Ballista is approximately 3x faster than Comet. Given that they are executing similar plans, I would expect performance to be similar.

The main difference between Comet and Ballista is that Comet transfers batches between JVM and native code during shuffle operations.

Most of the native execution time in Comet is spent in ScanExec which is reading Arrow batches from the JVM using Arrow FFI.

Screenshot from 2024-11-26 12-30-01

viirya commented 5 days ago

Most of the native execution time in Comet is spent in ScanExec which is reading Arrow batches from the JVM using Arrow FFI.

Do you mean when ScanExec is used as pseudo scan node to read shuffled data into next native execution?

If so, it makes some sense because Comet shuffle reader is still JVM based. We should make it native eventually to boost shuffle reading performance. It is on the early roadmap when we started on Comet shuffle. Although it is not super urgent and high priority at that moment. But now I think it is the time we can begin to work on this.

Opened: https://github.com/apache/datafusion-comet/issues/1125

andygrove commented 4 days ago

Do you mean when ScanExec is used as pseudo scan node to read shuffled data into next native execution?

Yes, exactly.

andygrove commented 3 hours ago

If so, it makes some sense because Comet shuffle reader is still JVM based.

This is also an issue for shuffle writes. The child node of ShuffleWriterExec is always a ScanNode reading the output from a native plan.

We pay the FFI cost twice - once to import from native plan to JVM, then again to export to the shuffle write native plan. We have the cost of schema serde in both directions. Perhaps there is a way to shortcut this and avoid a full serde because we do not need to read the batch in the JVM in this case, just pass it from one native plan to another.