apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.31k stars 2.41k forks source link

[SUPPORT]CreateHoodieTableCommand.orgsetter$nodePatterns_$eq(Lscala/collection/Seq;)V is abstract #5684

Closed sunke38 closed 2 years ago

sunke38 commented 2 years ago

I try to create table and sysn to hive. However its show CreateHoodieTableCommand.orgsetter$nodePatterns_$eq(Lscala/collection/Seq;)V is abstract. I use hudi 0.11 + spark 3.2.1.

Environment Description

Hudi version : 0.11

Spark version : 3.2.1

Hadoop version : 3.2.2

Storage (HDFS/S3/GCS..) : HDFS

Running on Docker? (yes/no) : no

Stacktrace

Current State: ACTIVE Thread State: RUNNABLE

here is my code

  implicit val sparkSession = SparkSession.builder();
  .appName("SparkHudi");
  .config("spark.sql.warehouse.dir","/user/hive/warehouse");
  .config("spark.serialize","org.apache.spark.serializer.KryoSerializer");
  .config("spark.sql.extension","org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
  .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.hudi.catalog.HoodieCatalog");
  .enableHiveSupport();
  .config("hive.metastore.uris","thrift://10.10.9.203:9083");
  .getOrCreate();`
private def chackAndCreateTable(row: Row)(implicit sparkSession: SparkSession): Unit = {
      try{
      var chackTableExist = s"SHOW CREATE TABLE ${row.getAs("table")}_cow"
      sparkSession.sql(chackTableExist)
        }catch{
      case ex:AnalysisException => {
        if(ex.message.contains("Table or permanent view not found")){
          println(s"todo 没有找到${row.getAs("table")}_cow表 自动建 ods表")
          //println(row.schema.toDDL)
          val sqlString = s"""create table ${row.getAs("table")}_cow (${row.schema.toDDL})
            |using hudi tblproperties (
            |type = 'cow',
            |primaryKey = 'pk',
            |preCombineField = 'es',
            |hoodie.datasource.hive_sync.enable = true,
            |hoodie.datasource.hive_sync.database= '${row.getAs("database")}',
            |hoodie.datasource.hive_sync.table = '${row.getAs("table")}_cow',
            |hoodie.datasource.hive_sync.mode = 'hms'
            |) location '/hudi/datalake/${row.getAs("database")}/${row.getAs("table")}_cow';
          """.stripMargin.stripMargin('\n')
          val tbDf = sparkSession.sql(sqlString).printSchema()
          println(sqlString)
        }
      }
    }

spark-submit command

spark-submit
--jars /home/kadm/module/hudi-0.11/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.11.0.jar
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,
  org.apache.spark:spark-avro_2.12:3.2.1,
  org.apache.kafka:kafka-clients:3.1.0
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
--conf "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=1000"
--master spark://hadoop:7077 SparkHudi-1.0-SNAPSHOT-shaded.jar
Listening for transport dt_socket at address: 5445
:: loading settings :: url = jar:file:/home/kadm/module/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/kadm/.ivy2/cache
The jars for the packages stored in: /home/kadm/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fd45f2c8-7f41-46e0-b08a-050ec8523fc3;1.0
    confs: [default]
    found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
    found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in local-m2-cache
    found org.apache.hadoop#hadoop-client-runtime;3.3.1 in local-m2-cache
    found org.spark-project.spark#unused;1.0.0 in local-m2-cache
    found org.apache.hadoop#hadoop-client-api;3.3.1 in local-m2-cache
    found org.apache.htrace#htrace-core4;4.1.0-incubating in local-m2-cache
    found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
    found commons-logging#commons-logging;1.1.3 in local-m2-cache
    found com.google.code.findbugs#jsr305;3.0.0 in local-m2-cache
    found org.apache.commons#commons-pool2;2.6.2 in central
    found org.apache.spark#spark-avro_2.12;3.2.1 in central
    found org.tukaani#xz;1.8 in local-m2-cache
    found org.apache.kafka#kafka-clients;3.1.0 in central
    found com.github.luben#zstd-jni;1.5.0-4 in local-m2-cache
    found org.lz4#lz4-java;1.8.0 in local-m2-cache
    found org.xerial.snappy#snappy-java;1.1.8.4 in local-m2-cache
:: resolution report :: resolve 576ms :: artifacts dl 17ms
    :: modules in use:
    com.github.luben#zstd-jni;1.5.0-4 from local-m2-cache in [default]
    com.google.code.findbugs#jsr305;3.0.0 from local-m2-cache in [default]
    commons-logging#commons-logging;1.1.3 from local-m2-cache in [default]
    org.apache.commons#commons-pool2;2.6.2 from central in [default]
    org.apache.hadoop#hadoop-client-api;3.3.1 from local-m2-cache in [default]
    org.apache.hadoop#hadoop-client-runtime;3.3.1 from local-m2-cache in [default]
    org.apache.htrace#htrace-core4;4.1.0-incubating from local-m2-cache in [default]
    org.apache.kafka#kafka-clients;3.1.0 from central in [default]
    org.apache.spark#spark-avro_2.12;3.2.1 from central in [default]
    org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 from central in [default]
    org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 from local-m2-cache in [default]
    org.lz4#lz4-java;1.8.0 from local-m2-cache in [default]
    org.slf4j#slf4j-api;1.7.30 from local-m2-cache in [default]
    org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default]
    org.tukaani#xz;1.8 from local-m2-cache in [default]
    org.xerial.snappy#snappy-java;1.1.8.4 from local-m2-cache in [default]
    :: evicted modules:
    org.apache.kafka#kafka-clients;2.8.0 by [org.apache.kafka#kafka-clients;3.1.0] in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   17  |   0   |   0   |   1   ||   16  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-fd45f2c8-7f41-46e0-b08a-050ec8523fc3
    confs: [default]
    0 artifacts copied, 16 already retrieved (0kB/67ms)
22/05/25 13:29:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/05/25 13:29:44 WARN KafkaUtils: overriding enable.auto.commit to false for executor
22/05/25 13:29:44 WARN KafkaUtils: overriding auto.offset.reset to none for executor
22/05/25 13:29:44 WARN KafkaUtils: overriding executor group.id to spark-executor-group-88
22/05/25 13:29:44 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.822/05/25 13:29:51 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
todo 没有找到****_cow表 自动建 ods表
22/05/25 13:29:53 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Exception in thread "streaming-job-executor-0" java.lang.AbstractMethodError: Method org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.org$apache$spark$sql$catalyst$plans$logical$Command$_setter_$nodePatterns_$eq(Lscala/collection/Seq;)V is abstract
    at org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.org$apache$spark$sql$catalyst$plans$logical$Command$_setter_$nodePatterns_$eq(CreateHoodieTableCommand.scala)
    at org.apache.spark.sql.catalyst.plans.logical.Command.$init$(Command.scala:38)
    at org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.<init>(CreateHoodieTableCommand.scala:46)
    at org.apache.spark.sql.hudi.catalog.HoodieCatalog.createHoodieTable(HoodieCatalog.scala:258)
    at org.apache.spark.sql.hudi.catalog.HoodieCatalog.createTable(HoodieCatalog.scala:112)
    at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:42)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
    at com.example.SparkHudi$.chackAndCreateTable(SparkHudi.scala:175)
    at com.example.SparkHudi$.$anonfun$main$6(SparkHudi.scala:126)
    at com.example.SparkHudi$.$anonfun$main$6$adapted(SparkHudi.scala:121)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at com.example.SparkHudi$.$anonfun$main$5(SparkHudi.scala:121)
    at com.example.SparkHudi$.$anonfun$main$5$adapted(SparkHudi.scala:107)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
xushiyan commented 2 years ago

@sunke38 it's hard to read. can you pls format your code and stacktrace separately with proper block code format like

print

noticed that when you initiate spark session, you set spark.sql.extension; the right key is spark.sql.extensions

sunke38 commented 2 years ago

@xushiyan sorry for that, I modify style. Here is. I change key to spark.sql.extensions but still show it. By the way, sql of create table is work in spark-shell

sunke38 commented 2 years ago

xushiyan It seem like fixed. some unnecessary dependency caused this issue