apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.31k stars 2.41k forks source link

[SUPPORT]com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 36 #6621

Closed dik111 closed 1 year ago

dik111 commented 1 year ago

Describe the problem you faced

I used Flink to update data in realtime, and used spark to read data, it throws an error com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 36

I used Flink to read the data and no relevant exception occurred

To Reproduce

Steps to reproduce the behavior:

1.Use Flink to update data in realtime 2.Use Spark to read data

Environment Description

Additional context

Here is my code:

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val spark:SparkSession = {
      SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[4]")
        .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.extensions","org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
        .getOrCreate()
    }

    val tableName = "pt_sfy_sfy_oe_order_lines_all_hudi_0908"
    val tablePath = "hdfs://xxx:8020/warehouse/tablespace/managed/hive/test.db/pt_sfy_sfy_oe_order_lines_all_hudi_090666"

    val dataFrame = spark.read.format("org.apache.hudi").load(tablePath)
    dataFrame.createOrReplaceTempView("pt_sfy_sfy_oe_order_lines_all_hudi_090666")
    spark.sql("select count(*) from pt_sfy_sfy_oe_order_lines_all_hudi_090666").show()
    spark.stop()
  }

Stacktrace

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 36
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:388)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:104)
    at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:78)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:106)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:91)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:473)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324)
    at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:402)
    at org.apache.hudi.HoodieMergeOnReadRDD$LogFileIterator.<init>(HoodieMergeOnReadRDD.scala:196)
    at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.<init>(HoodieMergeOnReadRDD.scala:278)
    at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:132)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
22/09/07 16:14:36 ERROR Executor: Exception in task 73.0 in stage 3.0 (TID 251)
org.apache.hudi.exception.HoodieException: Exception when reading log file 
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324)
    at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:402)
    at org.apache.hudi.HoodieMergeOnReadRDD$LogFileIterator.<init>(HoodieMergeOnReadRDD.scala:196)
    at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.<init>(HoodieMergeOnReadRDD.scala:278)
    at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:132)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
danny0405 commented 1 year ago

https://github.com/JohnSnowLabs/spark-nlp/issues/1123

Did you try configuring the serializer explicitly: spark.serializer: org.apache.spark.serializer.KryoSerializer

danny0405 commented 1 year ago

@alexeykudinkin Do you have interest on this ?

codope commented 1 year ago

@dik111 did you get a chance to try out this suggestion https://github.com/apache/hudi/issues/6621#issuecomment-1241538207 ?

alexeykudinkin commented 1 year ago

Created HUDI-4959 to track this issue.

TL;DR Problem is in the fact that we're using Kryo to ser/de HoodieKey (which fails b/c can't find class registration nor association with the encoded id)


EDIT

I'm taking back my hypothesis that the issue is in the class encoding, after writing a small test to validate the issue i confirmed that Kryo actually writes out full class-name for all classes registered implicitly (as it should).

It seems that the problem is actually indeed in misalignment of the Avro versions as reported by @KnightChess: quick-checking i see that b/w Avro 1.8.2 and 1.10.2, Utf8 actually had one more field added:

  // 1.8.2 
  private byte[] bytes = EMPTY;
  private int length;
  private String string;

  // 1.10.2
  private byte[] bytes;
  private int hash;
  private int length;
  private String string;

Provided that we're relying on Kryo to generate serializer for orderingVal that could be Utf8 (based on FieldSerializer) it would actually explain why it couldn't deserialize it back (since they will have different serializers).

KnightChess commented 1 year ago

we also encountered this problem, in our env, flink-bundle avro version is 1.10.0 and spark3-bundle avro version is 1.10.2, and after flink write mor table with deletelogblock

spark: spark query will throw exception, classId - 2 = index is id 36

Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 36, Size: 7
Serialization trace:
string (org.apache.hudi.org.apache.avro.util.Utf8)
orderingVal (org.apache.hudi.common.model.DeleteRecord)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:104)
    at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:78)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:106)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:91)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:476)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:346)
    ... 30 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 36, Size: 7
    at java.util.ArrayList.rangeCheck(ArrayList.java:659)
    at java.util.ArrayList.get(ArrayList.java:435)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:857)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:780)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
    ... 44 more

presto: presto will throw the same exception which you meet, presto avro version is 1.8.2

resovle: In spark, I change the avro version to 1.10.0 which the same as flink, anything will be ok in query, I didn't check for presto, you can have a try in spark @dik111

Armelabdelkbir commented 1 year ago

Hello i also have this probleme when i try to read data with spark, same job works fine sometimes and sometimes failed :

Steps to reproduce :

1.Use spark structured streaming to fetch data from kafka in realtime 2.Use Spark to read data i have problem when i'm using spark submit, but i can't reproduce the issue with jupyter :( My stack Hudi version : 0.11.0 Spark version : 2.4.6 / 3.1.4 Hive version : 1.2.1000 Storage (HDFS) : 2.7.3

58782 [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ShuffleMapStage 8 (parquet at Utilities.scala:27) failed in 8.241 s due to Job aborted due to stage failure: Task 1 in stage 8.0 failed 4 times, most recent failure: Lost task 1.3 in stage 8.0 (TID 35, cnode24.datalake.com, executor 36): org.apache.hudi.exception.HoodieException: Exception when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:332) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:178) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:103) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:96) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:291) at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:370) at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:240) at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:230) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:77) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: [Lorg.apache.hudi.common.model.DeleteRecord; at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:804) at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:104) at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:78) at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getKeysToDelete(HoodieDeleteBlock.java:89) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:430) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:323) ... 34 more

alexeykudinkin commented 1 year ago

@dik111 please check my updated comment: can you try remediation that @KnightChess called out (making sure that the versions of Avro used in Flink and Spark are identical)?

In the meantime, i'm going to be putting up a fix to properly handle Utf8 in Kryo

alexeykudinkin commented 1 year ago

Please follow HUDI-4959 for the updates regarding the fix. Closing this one (in favor of the Jira)