apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.44k stars 2.42k forks source link

[SUPPORT] ctas error in spark3.1.1 & hudi 0.13.1 #9506

Open FishMAN002 opened 1 year ago

FishMAN002 commented 1 year ago

Describe the problem you faced

ctas error in spark3.1.1 & hudi 0.13.1

To Reproduce

Steps to reproduce the behavior:

  1. spark-sql shell

    spark-sql \
      --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
      --conf "spark.hadoop.mapreduce.input.pathFilter.class=org.apache.hudi.hadoop.HoodieROTablePathFilter" \
      --conf "spark.dynamicAllocation.enabled=false" \
      --conf "spark.dynamicAllocation.initialExecutors=4" \
      --conf "spark.dynamicAllocation.maxExecutors=10" \
      --conf "spark.driver.memory=8g" \
      --conf "spark.driver.memoryOverhead=4g" \
      --conf "spark.driver.maxResultSize=8g" \
      --conf "spark.executor.instances=6" \
      --conf "spark.executor.memory=4g" \
      --conf "spark.executor.memoryOverhead=2g" \
      --conf "spark.sql.hive.convertMetastoreParquet=false" \
      --conf "hive.exec.dynamici.partition=true" \
      --conf "hive.exec.dynamic.partition.mode=nonstrict" \
      --conf "spark.sql.adaptive.enabled=true" \
      --conf "spark.sql.parquet.writeLegacyFormat=true" \
      --conf "spark.driver.extraJavaOptions=-Dfile.encoding=utf-8 -Dsun.jnu.encoding=utf-8" \
      --conf "spark.executor.extraJavaOptions=-Dfile.encoding=utf-8 -Dsun.jnu.encoding=utf-8" \
      --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
  2. ctas shell

    create table ods_test.type_change_test13 (
    id int,
    name string,
    age int,
    class int,
    phone bigint,
    weight double,
    meta_es_offset string,
    `year` string,
    `month` string,
    `day` string
    ) using hudi
    tblproperties (
    type = 'cow',
    primaryKey = 'id',
    preCombineField = 'meta_es_offset'
    )
    partitioned by (`year`, `month`, `day`)
    location 's3://xxxxxxxx/type_change_test13/';
  3. error

    java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.Command.producedAttributes$(Lorg/apache/spark/sql/catalyst/plans/logical/Command;)Lorg/apache/spark/sql/catalyst/expressions/AttributeSet;
    at org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.producedAttributes(CreateHoodieTableCommand.scala:49)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.references$lzycompute(QueryPlan.scala:68)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.references(QueryPlan.scala:68)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.missingInput(QueryPlan.scala:74)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.statePrefix(QueryPlan.scala:292)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.statePrefix(LogicalPlan.scala:68)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.simpleString(QueryPlan.scala:294)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:296)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:685)
    at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:606)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:487)
    at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:241)
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:260)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:216)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:102)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:381)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:500)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:494)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:494)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:284)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:959)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1038)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1047)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.Command.producedAttributes$(Lorg/apache/spark/sql/catalyst/plans/logical/Command;)Lorg/apache/spark/sql/catalyst/expressions/AttributeSet;
    at org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.producedAttributes(CreateHoodieTableCommand.scala:49)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.references$lzycompute(QueryPlan.scala:68)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.references(QueryPlan.scala:68)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.missingInput(QueryPlan.scala:74)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.statePrefix(QueryPlan.scala:292)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.statePrefix(LogicalPlan.scala:68)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.simpleString(QueryPlan.scala:294)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:296)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:685)
    at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:606)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:487)
    at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:241)
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:260)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:216)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:102)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:381)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:500)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:494)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:494)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:284)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:959)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1038)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1047)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Environment Description

Another [HUDI-3131] fix ctas error in spark3.1.1 #4549 @YannByron Can you help me check it ?

ad1happy2go commented 1 year ago

@FishMAN002 Which Hudi-spark bundle jar you are using. Did you build by your own. Can you try with --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.13.1

FishMAN002 commented 1 year ago

@FishMAN002 Which Hudi-spark bundle jar you are using. Did you build by your own. Can you try with --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.13.1

@ad1happy2go I build it by my own. My build shell is

/usr/local/opt/apache-maven-3.8.5/bin/mvn clean package -DskipTests -Dspark3.1 -Dflink1.14 -Dscala-2.12 -Drat.skip=true -Dcheckstyle.skip=true

And hudi-spark-bundle-jar name is hudi-spark3.1-bundle_2.12-0.13.1.jar.

ad1happy2go commented 1 year ago

@FishMAN002 The build command looks good. Did you tried with --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.13.1 . If you still get the same issue then at least that will rule out any jar compatibility issue.

I tried the exactly same command with --packages option and the CTAS worked good for me.

FishMAN002 commented 1 year ago

@ad1happy2go Are you suggesting that I try this command:

 /usr/local/opt/apache-maven-3.8.5/bin/mvn clean package -DskipTests -Dspark3.1 -Dflink1.14 -Dscala-2.12 -Drat.skip=true  -Dcheckstyle.skip=true      **--packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.13.1** 

If that's not right, could you provide me with a right command?Thank you very much !

ad1happy2go commented 1 year ago

@FishMAN002 Sorry for the delayed reply here. What I meant is directly to use the maven hudi package I provided when you try to open spark-shell/spark-sql/pyspark. I hope you would have been figured that out yet. Are you still facing this issue? Please let us know.