apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Read from hudi table results in crash (com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException) #12108

Open meatheadmike opened 11 hours ago

meatheadmike commented 11 hours ago

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. Spin up a spark cluster on kubernetes with 1 x driver @ 16gb/1 core and 5 x executors @ 16gb/1 core
  2. Submit the pyspark script show below
  3. Spin up a separate spark query node while the ingest is happening (1 x 16GB / 1 core)
  4. Run a simple query against the hudi table in a loop. i.e.: spark.sql('refresh table example-table').show(); spark.sql('select count(*) from example-table').show()
  5. Wait for 4 or 5 ingest batches to complete. After that, the stack trace will show up instead of the record count as expected. Note that all queries fail from this point on.

**Pyspark script***

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import dbldatagen as dg
import json
import uuid
import random

spark = (
    SparkSession
    .builder
    .appName('stream-to-hudi-s3-example')
    .getOrCreate()
)

hudi_db = 'default'
hudi_table = 'example-table'
hudi_checkpoint_path = 's3a://my-checkpoint-path'
hudi_table_path = 's3a://my-table-path' 
hive_thrift_url = 'thrift://hive-metastore.default:9083'
hive_jdbc_url = 'jdbc:hive2://hive-metastore.default:10000'

schema = ArrayType(
    StructType([
        StructField("domain", StringType(), False),
        StructField("risk", StringType(), False),
        StructField("timestamp", TimestampType(), False),
    ])
)

multi_writer_id = 'datagen-writer1'
@udf(returnType=StringType())
def generate_domain(): 
    rand_domain = f"{random.randrange(10000000,20000000)}.com"
    return rand_domain

@udf(returnType=LongType())
def generate_timestamp():
    return random.randrange(1000000000,2000000000)

@udf(returnType=StringType())
def generate_risk():
    return json.dumps({"blaa":str(uuid.uuid4())})

ds = (
         dg.DataGenerator(spark, name="test-data-set", partitions=1)
         .withColumn("offset", "long", minValue=1, maxValue=9999999, random=True)
         .withColumn("timestamp_", "timestamp", random=True)
         .build(withStreaming=True, options={'rowsPerSecond': 10000, 'rampUpTimeSeconds':60})
         .withColumn("domain", generate_domain())
         .withColumn("timestamp", generate_timestamp())
         .withColumn("risk", generate_risk())
         .withColumnRenamed("timestamp_","kafka_timestamp")
    ) 

df = (
         ds.select(col("offset"), col("kafka_timestamp"), col("domain"), col("timestamp"), col("risk"))
         .na.drop()
     )

hudi_precombine_field = 'timestamp'
hudi_recordkey_field = 'domain'

hudi_options = {
    'hoodie.archive.async': True,
    'hoodie.clean.async.enabled': True,
    'hoodie.clean.automatic': True,
    'hoodie.clean.commits.retained': 5,
    'hoodie.clean.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.clean.fileversions.retained': '2',
    'hoodie.cleaner.policy.failed.writes': 'LAZY',
    'hoodie.clustering.async.enabled': False,
    'hoodie.clustering.async.max.commits': 0,
    'hoodie.clustering.async.max.commits': 2,
    'hoodie.clustering.execution.strategy.class': 'org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy',
    'hoodie.clustering.inline': True,
    'hoodie.clustering.inline.max.commits': 2,
    'hoodie.clustering.plan.strategy.class': 'org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy',
    'hoodie.clustering.plan.strategy.sort.columns': hudi_recordkey_field,
    'hoodie.clustering.preserve.commit.metadata': True,
    'hoodie.clustering.rollback.pending.replacecommit.on.conflict': True,
    'hoodie.clustering.updates.strategy': 'org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy',
    'hoodie.compact.inline.max.delta.commits': 2,
    'hoodie.datasource.hive_sync.db': hudi_db,
    'hoodie.datasource.hive_sync.enable':True,
    'hoodie.datasource.hive_sync.ignore_exceptions': True,
    'hoodie.datasource.hive_sync.jdbcurl':hive_jdbc_url,
    'hoodie.datasource.hive_sync.metastore.uris':hive_thrift_url,
    'hoodie.datasource.hive_sync.mode':'hms',
    'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.NonPartitionedExtractor',
    'hoodie.datasource.hive_sync.password':'',
    'hoodie.datasource.hive_sync.recreate_table_on_error': True,
    'hoodie.datasource.hive_sync.skip_ro_suffix':True,
    'hoodie.datasource.hive_sync.table':hudi_table,
    'hoodie.datasource.hive_sync.username':'hive',
    'hoodie.datasource.meta.sync.enable': 'true',
    'hoodie.datasource.read.incr.fallback.fulltablescan.enable': True,
    'hoodie.datasource.read.use.new.parquet.file.format': True,
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': hudi_precombine_field,
    'hoodie.datasource.write.reconcile.schema':'true',
    'hoodie.datasource.write.record.merger.impls': 'org.apache.hudi.HoodieSparkRecordMerger',
    'hoodie.datasource.write.recordkey.field': hudi_recordkey_field,
    'hoodie.datasource.write.row.writer.enable': True,
    'hoodie.datasource.write.streaming.checkpoint.identifier': multi_writer_id,
    'hoodie.datasource.write.streaming.ignore.failed.batch': 'true',
    'hoodie.datasource.write.table.name': hudi_table,
    'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    'hoodie.enable.data.skipping': True,
    'hoodie.index.type': 'RECORD_INDEX',
    'hoodie.keep.min.commits':'10',
    'hoodie.logfile.data.block.format':'parquet',
    'hoodie.merge.use.record.positions': True,
    'hoodie.metadata.auto.initialize': True,
    'hoodie.metadata.enable': True,
    'hoodie.metadata.clean.async': True,
    'hoodie.metadata.index.async': False, # DO NOT SET TRUE!!! Record and column indexes will not be created!
    'hoodie.metadata.index.column.stats.columns': hudi_recordkey_field,
    'hoodie.metadata.index.column.stats.column.list': hudi_recordkey_field,
    'hoodie.metadata.index.column.stats.enable': True,
    'hoodie.metadata.record.index.enable': True,
    'hoodie.parquet.avro.write-old-list-structure':'false',
    'hoodie.parquet.compression.codec': 'snappy',
    'hoodie.record.index.use.caching':True,
    'hoodie.schema.on.read.enable': True,
    'hoodie.table.name': hudi_table,
    'hoodie.table.services.enabled': True,
    'hoodie.write.concurrency.early.conflict.detection.enable': True,
    'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
    'hoodie.write.executor.type': 'DISRUPTOR',
    'hoodie.write.lock.provider': 'org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider',
    'hoodie.write.lock.wait_time_ms': '300000',
    'hoodie.write.lock.zookeeper.base_path': '/hudi/local/table',
    'hoodie.write.lock.zookeeper.lock_key': f'{hudi_table}',
    'hoodie.write.lock.zookeeper.port': '2181',
    'hoodie.write.lock.zookeeper.url': 'zk-cs.default',
    'hoodie.write.set.null.for.missing.columns': True, 
    'checkpointLocation': hudi_checkpoint_path,
    'parquet.avro.write-old-list-structure': 'false',
    'path': hudi_table_path,
}

print(f"hudi_options={hudi_options}")

df.writeStream \
   .format("org.apache.hudi") \
   .options(**hudi_options) \
   .outputMode("append") \
   .start() 

spark.streams.awaitAnyTermination()

Expected behavior

Queries should succeed without crashing.

Environment Description

Additional context

I've been trying to narrow down the cause of this without a huge amount of success. I disabled clustering, but it still occurs. It looks like something strange is happening on the read when it's merging. It doesn't seem to crash if every record key is 100% unique. But of course in the real world I need to be able to upsert data...

Stacktrace

24/10/15 20:53:21 INFO TaskSetManager: Finished task 2.0 in stage 77.0 (TID 307) in 6029 ms on mike-test-spark-app-74d7488c9d-kzm2z (executor driver) (14/15)
24/10/15 20:53:22 INFO HoodieLogFileReader: Closing Log file reader .cbd4e0ac-3d14-4a38-9199-6800a4e25946-0_20241015205126562.log.1_6-361-2513
24/10/15 20:53:22 INFO HoodieMergedLogRecordReader: Number of log files scanned => 1
24/10/15 20:53:22 INFO HoodieMergedLogRecordReader: Number of entries in Map => 233427
24/10/15 20:53:22 INFO InternalParquetRecordReader: at row 0. reading next block
24/10/15 20:53:23 INFO InternalParquetRecordReader: block read in memory in 1023 ms. row count = 1686704
24/10/15 20:53:23 ERROR Executor: Exception in task 0.0 in stage 77.0 (TID 305)
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
right (org.apache.hudi.common.util.collection.ImmutablePair)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:103)
    at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:77)
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:209)
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:202)
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:198)
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.remove(BitCaskDiskMap.java:245)
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.remove(BitCaskDiskMap.java:67)
    at org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:240)
    at org.apache.hudi.common.table.read.HoodieKeyBasedFileGroupRecordBuffer.hasNextBaseRecord(HoodieKeyBasedFileGroupRecordBuffer.java:125)
    at org.apache.hudi.common.table.read.HoodieKeyBasedFileGroupRecordBuffer.doHasNext(HoodieKeyBasedFileGroupRecordBuffer.java:135)
    at org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.hasNext(HoodieBaseFileGroupRecordBuffer.java:130)
    at org.apache.hudi.common.table.read.HoodieFileGroupReader.hasNext(HoodieFileGroupReader.java:201)
    at org.apache.hudi.common.table.read.HoodieFileGroupReader$HoodieFileGroupReaderIterator.hasNext(HoodieFileGroupReader.java:262)
    at org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat$$anon$1.hasNext(HoodieFileGroupReaderBasedParquetFileFormat.scala:250)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.UnsupportedOperationException
    at java.base/java.util.Collections$UnmodifiableCollection.add(Collections.java:1067)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 49 more
danny0405 commented 8 hours ago

is it because of the Kyro jar conflict?