apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] KryoException while using spark-hudi #9891

Open akshayakp97 opened 1 year ago

akshayakp97 commented 1 year ago

Running into below error while using emr 6-10.1. Want to understand if this was fixed by newer Hudi versions. Found a similar issue for Flink - https://github.com/apache/hudi/issues/6540

Hudi options -

if partition_field:
                keygenerator = "org.apache.hudi.keygen.ComplexKeyGenerator"
                extractor = "org.apache.hudi.hive.MultiPartKeysValueExtractor"
                hive_style = "true"
                partition_field_name = "pdate"
                df = df.filter(df[partition_field].isNotNull())
                df = df.withColumn(
                    partition_field_name, to_date(col(partition_field), "yyyy-MM-dd")
                )
            else:
                keygenerator = "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
                extractor = "org.apache.hudi.hive.NonPartitionedExtractor"
                hive_style = "false"
                partition_field_name = ""

            emr_master_ip = aws.get_emr_master_pvt_ip(region_name=region_name)
            hive_sync_jdbc = "jdbc:hive2://{}:10000".format(emr_master_ip)
            hudi_options = {
                "hoodie.table.name": tgt_table,
                "hoodie.datasource.write.recordkey.field": rec_key,
                "hoodie.datasource.write.table.type": type,
                "hoodie.datasource.write.keygenerator.class": keygenerator,
                "hoodie.datasource.write.table.name": tgt_table,
                "hoodie.datasource.write.operation": operation,
                "hoodie.datasource.write.precombine.field": precombine,
                "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.DefaultHoodieRecordPayload",
                "hoodie.payload.ordering.field": precombine,
                "hoodie.datasource.hive_sync.support_timestamp": "true",
                "hoodie.bulkinsert.shuffle.parallelism": parallelism,
                "hoodie.upsert.shuffle.parallelism": parallelism,
                "hoodie.datasource.write.hive_style_partitioning": hive_style,
                "hoodie.datasource.hive_sync.enable": "true",
                "hoodie.datasource.hive_sync.partition_extractor_class": extractor,
                "hoodie.datasource.hive_sync.auto_create_database": "true",
                "hoodie.datasource.hive_sync.database": tgt_database,
                "hoodie.datasource.hive_sync.table": tgt_table,
                "hoodie.datasource.hive_sync.assume_date_partitioning": "false",
                "hoodie.datasource.hive_sync.use_jdbc": "true",
                "hoodie.datasource.hive_sync.jdbcurl": hive_sync_jdbc,
                "hoodie.compact.inline": "true",
                "hoodie.compact.inline.max.delta.commits": compact_inline_max_delta_commits,
                "hoodie.cleaner.policy": "KEEP_LATEST_COMMITS",
                "hoodie.cleaner.commits.retained": commit_retain_count,
                "hoodie.keep.min.commits": min_commit_to_keep,
                "hoodie.keep.max.commits": max_commit_to_keep,
            }
            if partition_field:
                hudi_options[
                    "hoodie.datasource.hive_sync.partition_fields"
                ] = partition_field_name
                hudi_options["hoodie.metadata.enable"] = (
                    "true" if enable_metadata == "Y" else "false"
                )
                hudi_options[
                    "hoodie.datasource.write.partitionpath.field"
                ] = partition_field_name

Environment Description

Stacktrace


23/10/16 17:14:47 INFO S3NativeFileSystem: Opening 's3://<redacted_s3path>/pdate=2023-10-12/.16c8f46c-a073-403b-b830-553f8c44b735-0_20231013061901564.log.1_4-452-41829' for reading
23/10/16 17:15:35 INFO S3NativeFileSystem: Opening 's3://<redacted_s3path>/pdate=2023-10-12/16c8f46c-a073-403b-b830-553f8c44b735-0_0-457-44680_20231013061901564.parquet' for reading
23/10/16 17:15:35 ERROR BoundedInMemoryExecutor: error consuming records
org.apache.hudi.com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
props (org.apache.avro.Schema$FixedSchema)
schema (org.apache.avro.generic.GenericData$Fixed)
orderingVal (org.apache.hudi.common.model.DefaultHoodieRecordPayload)
data (org.apache.hudi.common.model.HoodieAvroRecord)
    at org.apache.hudi.com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:100) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:74) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:210) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:203) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:199) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:68) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:198) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:55) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:340) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:90) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:80) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135) ~[hudi-spark3-bundle_2.12-0.12.2-amzn-0.jar:0.12.2-amzn-0]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.lang.NullPointerException```
ad1happy2go commented 1 year ago

@akshayakp97 Did you tried 0.12.3 or 0.13.1 ?