apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
550 stars 105 forks source link

Spark 3.4 + Arrow Datafusion Shuffle Manager Fails due to class loader isolation #221

Closed holdenk closed 1 month ago

holdenk commented 2 months ago

Describe the bug

When trying to run using org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager it fails due to class loader isolation.

Steps to reproduce

/home/holden/repos/high-performance-spark-examples/spark-3.4.2-bin-hadoop3/bin/spark-sql --master 'local[5]' --conf spark.eventLog.enabled=true --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.comet.CometSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop --conf spark.sql.catalog.local.warehouse=/home/holden/repos/high-performance-spark-examples/warehouse --jars /home/holden/repos/high-performance-spark-examples/accelerators/arrow-datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true --conf spark.comet.exec.all.enabled=true --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager --conf spark.comet.exec.shuffle.enabled=true --conf spark.comet.columnar.shuffle.enabled=true --conf spark.driver.userClassPathFirst=true --name sql/wap.sql -f sql/wap.sql

I think anything triggering a sort would suffice for repro but just in case my wap.sql here is:

DROP TABLE IF EXISTS local.wap_projects;
CREATE TABLE local.wap_projects (
       creator string,
       projectname string)
USING iceberg
PARTITIONED BY (creator);
ALTER TABLE local.projects SET TBLPROPERTIES (
    'write.wap.enabled''true'
);
-- We need a first commit, see https://github.com/apache/iceberg/issues/8849
INSERT INTO local.wap_projects VALUES("holdenk", "spark");
ALTER TABLE local.wap_projects DROP BRANCH IF EXISTS `audit-branch`;
ALTER TABLE local.wap_projects CREATE BRANCH `audit-branch`;
SET spark.wap.branch = 'audit-branch';
INSERT INTO local.projects VALUES("krisnova", "aurae");
SELECT count(*) FROM local.wap_projects VERSION AS OF 'audit-branch' WHERE creator is NULL;
SELECT count(*) FROM local.wap_projects VERSION AS OF 'audit-branch' WHERE creator == "krisnova";
CALL local.system.remove_orphan_files(table => 'local.wap_projects');
CALL local.system.fast_forward("local.wap_projects", "main", "audit-branch");

This results in:

24/03/20 14:26:53 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalAccessError: failed to access class org.apache.spark.shuffle.sort.ShuffleInMemorySorter from class org.apache.spark.shuffle.sort.CometShuffleExternalSorter$SpillSorter (org.apache.spark.shuffle.sort.ShuffleInMemorySorter is in unnamed module of loader 'app'; org.apache.spark.shuffle.sort.CometShuffleExternalSorter$SpillSorter is in unnamed module of loader org.apache.spark.util.ChildFirstURLClassLoader @14dc3f89)
    at org.apache.spark.shuffle.sort.CometShuffleExternalSorter$SpillSorter.<init>(CometShuffleExternalSorter.java:434)
    at org.apache.spark.shuffle.sort.CometShuffleExternalSorter.<init>(CometShuffleExternalSorter.java:169)
    at org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter.open(CometUnsafeShuffleWriter.java:236)
    at org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter.<init>(CometUnsafeShuffleWriter.java:165)
    at org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager.getWriter(CometShuffleManager.scala:189)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Expected behavior

I expect the query to run.

The expected output is:

Time taken: 0.038 seconds
spark.wap.branch    'audit-branch'
Time taken: 0.041 seconds, Fetched 1 row(s)
Time taken: 0.232 seconds
0
Time taken: 0.605 seconds, Fetched 1 row(s)
0
Time taken: 0.183 seconds, Fetched 1 row(s)
Time taken: 3.352 seconds
main    4878286225198802743 4878286225198802743
Time taken: 0.035 seconds, Fetched 1 row(s)

Additional context

You can work around this error by instead of using --jars to add the arrow datafusion comet jar to the classpath instead copying it into Spark's jar directory so it will be loaded with the same classloader.

holdenk commented 2 months ago

I suspect that the correct fix is a documentation note in the README (maybe + a try/catch in the code to print out a reference to the README) since changing the Spark class loader is not easy (I also tried with user classpath first class loader). If folks agree happy to make a PR.

We could also (maybe?) get at Spark's internal class loader and explicitly use it but that also seems very hack-ey

advancedxy commented 2 months ago

You can work around this error by instead of using --jars to add the arrow datafusion comet jar to the classpath instead copying it into Spark's jar directory so it will be loaded with the same classloader.

Emmm, it could be a potential solution. But it seems a bit of inconvenient. Per my understanding, it usually requires extra effort to change Spark's jar directory/archive in the production environment.

since changing the Spark class loader is not easy (I also tried with user classpath first class loader)

So this issue occurred regardless the spark.driver.userClassPathFirst setting being true or false?

holdenk commented 2 months ago

You can work around this error by instead of using --jars to add the arrow datafusion comet jar to the classpath instead copying it into Spark's jar directory so it will be loaded with the same classloader.

Emmm, it could be a potential solution. But it seems a bit of inconvenient. Per my understanding, it usually requires extra effort to change Spark's jar directory/archive in the production environment.

True, especially for users of a vendor solution, although for my deployments this isn't a big deal (we package our own Spark version anyways).

Let me take another look next week and see if there is a way to get loaded with Sparks default class loader.

since changing the Spark class loader is not easy (I also tried with user classpath first class loader)

So this issue occurred regardless the spark.driver.userClassPathFirst setting being true or false? Yup :(

Now I only tried in vanilla 3.4.

advancedxy commented 2 months ago

Let me take another look next week and see if there is a way to get loaded with Sparks default class loader.

Thanks for working on this. Another option came out of my mind would be shading and renaming the package scoped, shuffle related classes, such as org.apache.spark.shuffle.sort.ShuffleInMemorySorter -> org.apache.comet.shaded.ShuffleInMemorySorter into Comet's jar. It should be doable and seems very hack-ey too.

holdenk commented 2 months ago

Following on I tried adding --driver-class-path as well and it did the trick. So I think what I would purpose is updating the docs to include --driver-class-path and maybe adding a try/catch around the part where it has the error and logging a message to indicate the fix. WDYT @advancedxy ?

advancedxy commented 2 months ago

Ah, I remembered this option. I think it would be great to update the doc to include this option.

One thing more, I think you also need to mention spark.executor.extraClassPath for Spark on Yarn/K8S deployment's executors? Also cc @sunchao