apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

[SUPPORT]Trying to find org.apache.hudi.com.google.common.base.Preconditions when using ZookeeperBasedLockProvider #8723

Open kukayiyi opened 1 year ago

kukayiyi commented 1 year ago

I try to use spark to read kafka and write hudi table to minio, the code runs normally, then I try to use concurrency according to the demo to improve the speed, but ClassNotFound, and the program is trying to find google.common package from hudi package。 Exception

23/05/16 09:51:29 ERROR MicroBatchExecution: Query demo [id = 0665a730-c783-48ad-bd07-3f057d394553, runId = 99f946fe-0cc2-4ee7-adc7-3c0b19954758] terminated with error
org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
        at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:91)
        at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:118)
        at org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:116)
        at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:67)
        at org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:53)
        at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1465)
        at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1500)
        at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:156)
        at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:206)
        at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:336)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at com.yisa.huditest.HudiFromKafka$.$anonfun$main$1(HudiFromKafka.scala:86)
        at com.yisa.huditest.HudiFromKafka$.$anonfun$main$1$adapted(HudiFromKafka.scala:65)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
        ... 66 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hudi/com/google/common/base/Preconditions
        at org.apache.curator.ensemble.fixed.FixedEnsembleProvider.<init>(FixedEnsembleProvider.java:39)
        at org.apache.curator.framework.CuratorFrameworkFactory$Builder.connectString(CuratorFrameworkFactory.java:193)
        at org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.<init>(ZookeeperBasedLockProvider.java:70)
        ... 71 more
Caused by: java.lang.ClassNotFoundException: org.apache.hudi.com.google.common.base.Preconditions
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 74 more

Code

val spark = SparkSession
      .builder()
      .appName("HudiTest")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
      .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
      .config("spark.hadoop.fs.s3a.access.key", "miniopass")
      .config("spark.hadoop.fs.s3a.secret.key", "miniopass")
      .config("spark.hadoop.fs.s3a.endpoint", "node1:9000")
      .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
      .getOrCreate()

    val kafkaParams = Map[String, String](
      "kafka.bootstrap.servers" -> "node1:9092", 
      "startingoffsets" -> "earliest", 
      "maxOffsetsPerTrigger" -> "1000000",
      "subscribe" -> "vehicle-topic"
    )

    import spark.implicits._
    val kafkaDataSet:Dataset[Row] = spark
      .readStream
      .format("kafka")
      .options(kafkaParams)
      .load()
      .selectExpr("CAST(value AS STRING)")

    val query = kafkaDataSet
      .writeStream
      .queryName("demo")
      .foreachBatch { (batchDF: Dataset[Row], _: Long) => {
        val hudiDF = spark.read.json(batchDF.as[String])
        hudiDF.write.format("hudi")
          .options(getQuickstartWriteConfigs)
          .option(OPERATION_OPT_KEY, "upsert")
          .option(PRECOMBINE_FIELD_OPT_KEY, "markTime")
          .option(RECORDKEY_FIELD_OPT_KEY, "vehicleID")
          .option(PARTITIONPATH_FIELD_OPT_KEY, "date")
          .option(HoodieWriteConfig.TABLE_NAME, "vehicle_table")
          .option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
          .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
          .option("hoodie.cleaner.policy.failed.writes", "LAZY")
          .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
          .option("hoodie.write.lock.zookeeper.url", "node1")
          .option("hoodie.write.lock.zookeeper.port", "2181")
          .option("hoodie.write.lock.zookeeper.lock_key", "vehicle_kafka")
          .option("hoodie.write.lock.zookeeper.base_path", "/hudi_multiwriter")
          .mode(SaveMode.Append)
          .save("s3a://hudi/vehicle_kafka")
        }
      }
      .option("checkpointLocation", "/tmp/sparkHudi/")
      .start()

    query.awaitTermination()

Dependencies

<dependencies>
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark3.3-bundle_2.12</artifactId>
            <version>0.13.0</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>3.3.4</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.2</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>3.3.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

zookeeper version:3.5.10

Actually, com.google.common.base.Preconditions exists and I have found this class in maven dependency, but I don't understand why there is org.apache.hudi prefix in exception

danny0405 commented 1 year ago

If you use the spark hudi bundle jar, the pom shades the guava packages with org.apache.hudi prefix.

kukayiyi commented 1 year ago

If you use the spark hudi bundle jar, the pom shades the guava packages with org.apache.hudi prefix.

Thanks for your reply, what should I do, use other package combination like hudi-common, or relocation?

danny0405 commented 1 year ago

You need to check whether you have these missing classes in your bundle jar first.

kukayiyi commented 1 year ago

You need to check whether you have these missing classes in your bundle jar first.

org.apache.hudi does not contain the com.google.com.google.common package, but I put all related dependencies, including the guava package where com.google.common is located, in the spark cluster by means of dependent packages, but still have this exception

danny0405 commented 1 year ago

There is a shade pattern for com.google.common package in the bundle jar pom.

kukayiyi commented 1 year ago

There is a shade pattern for com.google.common package in the bundle jar pom.

I still don't know what to do, should com.google.common be included in the hudi bundle package? Or should I remove com.google.common from hudi's pom?

danny0405 commented 1 year ago

com.google.common be included in the hudi bundle package

It should be, and it needs to be shaded with org.apache.hudi prefix.

kukayiyi commented 1 year ago

com.google.common be included in the hudi bundle package

It should be, and it needs to be shaded with org.apache.hudi prefix.

I understand, but the hudi bundle package does not include com.google.common. I searched for several main component packages of hudi, but couldn't find it. Can you tell me which package contains com.google.common or do I need to package it myself? Actually I tried but failed to package

danny0405 commented 1 year ago

It's the google guava

Gatsby-Lee commented 3 months ago

I have the same issue with the Hudi Jar in the Amazon EMR Image 7.1.0. hmmm