apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

[SUPPORT] aws spark3.2.1 & hudi 0.13.1 with java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile #8903

Open zyclove opened 1 year ago

zyclove commented 1 year ago

Describe the problem you faced run : spark-sql --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.1 \

--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

query: select * from bi_ods_real.ods_api_test_task_log_rt limit 10;

with error:

Unable to read data from MOR table using spark. ERROR: org.apache.spark.sql.execution.datasources.PartitionedFile

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

spark-sql> select from bi_ods_real.ods_api_test_task_log_rt limit 10; 23/06/08 06:29:56 ERROR SparkSQLDriver: Failed in [select from bi_ods_real.ods_api_test_task_log_rt limit 10] java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile.(Lorg/apache/spark/sql/catalyst/InternalRow;Ljava/lang/String;JJ[Ljava/lang/String;)V at org.apache.hudi.BaseMergeOnReadSnapshotRelation.$anonfun$buildSplits$2(MergeOnReadSnapshotRelation.scala:237) at scala.Option.map(Option.scala:230) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.$anonfun$buildSplits$1(MergeOnReadSnapshotRelation.scala:235) at scala.collection.immutable.List.map(List.scala:293) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.buildSplits(MergeOnReadSnapshotRelation.scala:231) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:223) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:64) at org.apache.hudi.HoodieBaseRelation.buildScan(HoodieBaseRelation.scala:353) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:359) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:393) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:449) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:392) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:359) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71) at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:504) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$2(QueryExecution.scala:165) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:165) at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:158) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:158) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:178) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175) at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:308) at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:606) at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:308) at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:323) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:277) at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:256) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:102) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:69) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:287) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile.(Lorg/apache/spark/sql/catalyst/InternalRow;Ljava/lang/String;JJ[Ljava/lang/String;)V at org.apache.hudi.BaseMergeOnReadSnapshotRelation.$anonfun$buildSplits$2(MergeOnReadSnapshotRelation.scala:237) at scala.Option.map(Option.scala:230) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.$anonfun$buildSplits$1(MergeOnReadSnapshotRelation.scala:235) at scala.collection.immutable.List.map(List.scala:293) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.buildSplits(MergeOnReadSnapshotRelation.scala:231) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:223) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:64) at org.apache.hudi.HoodieBaseRelation.buildScan(HoodieBaseRelation.scala:353) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:359) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:393) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:449) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:392) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:359) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71) at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:504) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$2(QueryExecution.scala:165) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:165) at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:158) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:158) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:178) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175) at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:308) at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:606) at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:308) at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:323) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:277) at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:256) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:102) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:69) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:287) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

danny0405 commented 1 year ago

Hi, @umehrot2 , can you take a look at this, it seems a class conflict.

zyclove commented 1 year ago

@umehrot2 hi,Hudi Experts, can anyone help me? The prod env problem is more urgent, looking forward to an early reply.

ad1happy2go commented 1 year ago

@zyclove This is related to conflicts on hive/hadoop/spark run time versions on EMR and what Hudi spark bundle is compiled with.

Meanwhile instead of using the aws version, you can use the OSS version of hudi. For spark 3.2 this will be the one --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.1

ad1happy2go commented 1 year ago

@zyclove Sorry, just realised you using --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.1 , so that means its using the OSS version of hudi only. I will try to reproduce it.

ad1happy2go commented 1 year ago

@zyclove I am not able to reproduce this issue, I was able to get the EMR cluster started with the version you mentioned and able to create and use MOR tables using spark sql.

image

https://gist.github.com/ad1happy2go/9ed270ada234a3c78c79603a8abe755c

Are you still facing this issue? If not can you please let us know why you were facing the issue before.

zyclove commented 1 year ago

@ad1happy2go I use emr-6.5.0. It's error with " java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile".

But i have package with oss spark and hudi bundle. Work ok now.

<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <configuration>
                    <finalName>hudi-${spark.version}-plugin</finalName>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <relocations>
                                <relocation>
                                    <pattern>org.apache.spark.sql.execution.datasources.PartitionedFile</pattern>
                                    <shadedPattern>org.local.spark.sql.execution.datasources.PartitionedFile</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.curator</pattern>
                                    <shadedPattern>org.local.curator</shadedPattern>
                                </relocation>
                            </relocations>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>module-info.class</exclude>
                                        <exclude>org/apache/spark/unused/**</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
ad1happy2go commented 1 year ago

Thanks @zyclove .

lyle617 commented 1 year ago

@ad1happy2go I use emr-6.5.0. It's error with " java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile".

But i have package with oss spark and hudi bundle. Work ok now.

<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <configuration>
                    <finalName>hudi-${spark.version}-plugin</finalName>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <relocations>
                                <relocation>
                                    <pattern>org.apache.spark.sql.execution.datasources.PartitionedFile</pattern>
                                    <shadedPattern>org.local.spark.sql.execution.datasources.PartitionedFile</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.curator</pattern>
                                    <shadedPattern>org.local.curator</shadedPattern>
                                </relocation>
                            </relocations>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>module-info.class</exclude>
                                        <exclude>org/apache/spark/unused/**</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

I have package with hudi bundle. But the following error occurred:

Caused by: java.lang.ClassCastException: org.apache.hudi.spark.org.apache.spark.sql.execution.datasources.PartitionedFile cannot be cast to org.apache.spark.sql.execution.datasources.PartitionedFile
    at org.apache.hudi.HoodieMergeOnReadRDD.read(HoodieMergeOnReadRDD.scala:113)
    at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:65)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
parisni commented 1 year ago

Hello, we encounter the same error, on EMR . Removing the EMR /usr/lib/hudi did not help, so I assume this is not a dependency conflict

 pyspark --driver-memory 1g --executor-memory 1g --conf spark.dynamicAllocation.enabled=false --num-executors 1  --conf spark.executor.cores=1 --jars hudi-aws-0.13.1.jar,hudi-spark3.2-bundle_2.12-0.13.1.jar   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'   --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'   --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
data = [
    (1, "f5c2ebfd-f57b-4ff3-ac5c-f30674037b21", "A", "BC", "C"),
    (2, "f5c2ebfd-f57b-4ff3-ac5c-f30674037b22", "A", "BC", "C"),
    (3, "f5c2ebfd-f57b-4ff3-ac5c-f30674037b21", "A", "BC", "C"),
    (4, "f5c2ebfd-f57b-4ff3-ac5c-f30674037b22", "A", "BC", "C"),
]

schema = StructType(
    [
        StructField("uuid", IntegerType(), True),
        StructField("user_id", StringType(), True),
        StructField("col1", StringType(), True),
        StructField("ts", StringType(), True),
        StructField("part", StringType(), True),
    ]
)
df = spark.createDataFrame(data=data, schema=schema)

bucket = ...
tableName = "test_hudi_mor"
basePath = f"s3://"+bucket+"/test/" + tableName

hudi_options = {
    "hoodie.table.name": tableName,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.partitionpath.field": "part",
    "hoodie.datasource.write.table.name": tableName,
    "hoodie.datasource.write.operation": "insert",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.upsert.shuffle.parallelism": 2,
    "hoodie.insert.shuffle.parallelism": 2,
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.database": "ddefault",
    "hoodie.datasource.hive_sync.table": tableName,
    "hoodie.datasource.hive_sync.mode": "jdbc",
    "hoodie.meta.sync.client.tool.class": "org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool",
    "hoodie.datasource.hive_sync.partition_fields": "part",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
}
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))

# this works fine
spark.table("default."+tableName+"_ro").show()
# this raises an error
spark.table("default."+tableName+"_rt").show()
Junyewu commented 1 year ago

If you encounter the “java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile.” problem when read hudi rt table in aws emr spark. You can try upgrading emr cluster version to 6.9.0,it‘s can read hudi ro/rt table.

parisni commented 1 year ago

You can try upgrading emr cluster version to 6.9.0

thanks. I wonder how emr version can affect hudi when loading hudi version w/ --package and removing the /var/lib/hudi folder