apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.47k stars 2.43k forks source link

org.apache.hudi.exception.HoodieException: org.apache.avro.AvroTypeException: Cannot encode decimal with precision 14 as max precision 13 #11335

Open KSubramanyaH opened 6 months ago

KSubramanyaH commented 6 months ago

*Hell All,

We have a issue while doing clustering for one of our job:

Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: org.apache.avro.AvroTypeException: Cannot encode decimal with precision 14 as max precision 13 at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:387) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:369) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335) ... 28 more Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.AvroTypeException: Cannot encode decimal with precision 14 as max precision 13 at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) ... 31 more Caused by: org.apache.avro.AvroTypeException: Cannot encode decimal with precision 14 as max precision 13 at org.apache.avro.Conversions$DecimalConversion.validate(Conversions.java:140) at org.apache.avro.Conversions$DecimalConversion.toFixed(Conversions.java:104) at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1077) at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1001) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:946) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:873) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:944) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:873) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:894) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:873) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:843) at org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123) at org.apache.hudi.common.model.HoodieRecord.rewriteRecordWithNewSchema(HoodieRecord.java:382) at org.apache.hudi.table.action.commit.HoodieMergeHelper.lambda$runMerge$0(HoodieMergeHelper.java:136) at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:68) ... 32 more

There are some fields , which are having data type decimal(13,4) and incoming data is not having more precision than prescribed one . still we are getting above issue. max we get (10,4) precision values.

is there any solution solve this . This issue occurred after we start to use clustering for our tables

Hudi configs: {'hoodie.clustering.plan.strategy.target.file.max.bytes': '525829120', 'hoodie.datasource.hive_sync.table': '', 'hoodie.datasource.write.reconcile.schema': 'true', 'hoodie.index.type': 'SIMPLE', 'hoodie.clean.automatic': 'true', 'hoodie.write.markers.type': 'direct', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.schema.on.read.enable': 'true', 'hoodie.datasource.hive_sync.mode': 'hms', 'hoodie.datasource.write.drop.partition.columns': 'true', 'hoodie.datasource.write.recordkey.field': '_partition_year,_partition_month,_partition_day,id', 'hoodie.datasource.hive_sync.support_timestamp': 'true', 'hoodie.metadata.enable': 'false', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.clustering.plan.strategy.max.bytes.per.group': '525829120', 'hoodie.parquet.small.file.limit': '0', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS', 'hoodie.clustering.inline.max.commits': '4', 'hoodie.clustering.inline': 'true', 'hoodie.clustering.plan.strategy.max.num.groups': '200', 'hoodie.datasource.write.schema.allow.auto.evolution.column.drop': 'false', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.write.partitionpath.field': '_partition_year,_partition_month,_partition_day', 'hoodie.cleaner.fileversions.retained': '1', 'hoodie.table.name': '', 'hoodie.clustering.plan.strategy.small.file.limit': '512000', 'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload', 'hoodie.datasource.write.precombine.field': '_acq_change_seq', 'hoodie.datasource.hive_sync.database': '***', 'hoodie.datasource.write.operation': 'upsert'}

Hudi version : 0.14.0

Also i tried , 'spark.sql.storeAssignmentPolicy'='legacy'

Below is spark session spark = ( SparkSession.builder.appName("test") .config('spark.hadoop.hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') .config('spark.hadoop.spark.sql.legacy.parquet.nanosAsLong', 'false') .config('spark.hadoop.spark.sql.parquet.binaryAsString', 'false') .config('spark.hadoop.spark.sql.parquet.int96AsTimestamp', 'true') .config('spark.hadoop.spark.sql.caseSensitive', 'false') .config('spark.sql.parquet.datetimeRebaseModeInWrite','CORRECTED') .config('spark.sql.parquet.datetimeRebaseModeInRead','CORRECTED') .config('spark.sql.parquet.int96RebaseModeInWrite', 'CORRECTED') .config('spark.sql.storeAssignmentPolicy', 'legacy') .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'") .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'") .config('spark.sql.hive.convertMetastoreParquet', 'false') .config('conf spark.kryoserializer.buffer.max', '2040M') .config('fs.s3.maxRetries', '5') .enableHiveSupport() .getOrCreate() )

Could anybody help here . It is a production error

ad1happy2go commented 6 months ago

@KSubramanyaH Can you try to print schema for the table and check the same. Are you sure there are no changes in the incoming data for this.

Can you try to create a reproducible script for the same.

KSubramanyaH commented 6 months ago

Hello @ad1happy2go ...

below is printschema just before running Hudi load

root |-- op: string (nullable = true) |-- _acq_load_ts: string (nullable = true) |-- id: decimal(20,0) (nullable = true) |-- dataset_id: decimal(20,0) (nullable = true) |-- deleted_id: decimal(20,0) (nullable = true) |-- user_modified_at: timestamp (nullable = true) |-- created_at: timestamp (nullable = true) |-- modified_at: timestamp (nullable = true) |-- client_id: string (nullable = true) |-- opt_lock: long (nullable = true) |-- parent_id: decimal(20,0) (nullable = true) |-- name: string (nullable = true) |-- target_amount: decimal(13,4) (nullable = true) |-- overwritten_target_amount: decimal(13,4) (nullable = true) |-- calculated_spent_amount: decimal(13,4) (nullable = true) |-- filter_id: string (nullable = true) |-- recurring: byte (nullable = true) |-- frequency: string (nullable = true) |-- txn_ids: string (nullable = true) |-- excluded_txn_ids: string (nullable = true) |-- rollover_amount: decimal(13,4) (nullable = true) |-- reset_rollover_amount: decimal(13,4) (nullable = true) |-- cumulative_rollover_amount: decimal(13,4) (nullable = true) |-- rollover_type: string (nullable = true) |-- _acq_stream_position: string (nullable = true) |-- _acq_change_seq: string (nullable = true) |-- _job_run_id: string (nullable = true) |-- _s3_intermediate_location: string (nullable = true) |-- _partition_year: string (nullable = true) |-- _partition_month: string (nullable = true)

One more note : We are loading 2 hudi tables with same schema . 1 table is just load all cdc data with bulk insert and is working fine 2 table load is upsert and delete . But it is failing with decimal encode issue

I am very much sure that incoming data has not crossed prescribed schema (.i.e decimal(13,4)) .. Maximum that we receive decimal(10,4)

KSubramanyaH commented 5 months ago

Hi .. Any update on this , It is impacting our production tables

danny0405 commented 5 months ago

I see quite a few columns are declared as precision 13, do we have schema evolution on them?

|-- target_amount: decimal(13,4) (nullable = true)
|-- overwritten_target_amount: decimal(13,4) (nullable = true)
|-- calculated_spent_amount: decimal(13,4) (nullable = true)
KSubramanyaH commented 5 months ago

@danny0405 No, It has not evolved . But schema evolution is not required as incoming data is compatible with 13,4

We are loading 2 hudi tables with same schema . 1 table is just load all cdc data with bulk insert and is working fine 2 table load is upsert and delete . But it is failing with decimal encode issue

I am very much sure that incoming data has not crossed prescribed schema (.i.e decimal(13,4)) .. Maximum that we receive decimal(10,4)

danny0405 commented 5 months ago

Cannot encode decimal with precision 14 as max precision 13

But from the error msg there looks like a decimal with precision 14.

rangareddy commented 2 months ago

Hi @KSubramanyaH

I am unable to reproduce this issue using following code. I have used Spark 3.3.2, Hudi 0.14.1 and Java 8

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

object SparkHudiTest extends App {

  val spark = SparkSession.builder.appName("test")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("conf spark.kryoserializer.buffer.max", "2040M")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config("spark.hadoop.spark.sql.legacy.parquet.nanosAsLong", "false")
    .config("spark.hadoop.spark.sql.parquet.binaryAsString", "false")
    .config("spark.hadoop.spark.sql.parquet.int96AsTimestamp", "true")
    .config("spark.hadoop.spark.sql.caseSensitive", "false")
    .config("spark.sql.parquet.datetimeRebaseModeInWrite","CORRECTED")
    .config("spark.sql.parquet.datetimeRebaseModeInRead","CORRECTED")
    .config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
    .config("spark.sql.storeAssignmentPolicy", "legacy")
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'")
    .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .config("spark.master","local[2]")
    .getOrCreate()

  val tableName = "test_table"
  val basePath = "file:///tmp/test_table"

  val input_data = Seq(
    Row(1L, "hello", 42, BigDecimal(123331.15), 1695159649087L),
    Row(2L, "world", 13, BigDecimal(223331.72), 1695091554788L),
    Row(3L, "spark", 7, BigDecimal(323331.60), 1695115999911L)
  )

  val input_rdd = spark.sparkContext.parallelize(input_data)

  val input_schema = StructType(Seq(
    StructField("id", LongType),
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("salary", DecimalType(13, 4)),
    StructField("ts", LongType),
  ))

  val input_df = spark.createDataFrame(input_rdd, input_schema)
  input_df.show(truncate = false)

  input_df.write.format("hudi").
    option("hoodie.datasource.write.recordkey.field", "id").
    option("hoodie.table.precombine.field", "name").
    option("hoodie.datasource.write.operation", "bulk_insert").
    option("hoodie.table.name", tableName).
    mode("overwrite").
    save(basePath)

  val output_df = spark.read.format("hudi").load(basePath)
  output_df.show(truncate=false)

  val updatesDf = output_df.where("name='spark'").withColumn("age", col("age") + 10)
    //.withColumn("salary", col("salary") * 10)
  updatesDf.printSchema()
  output_df.printSchema()

  updatesDf.write.format("hudi").
    option("hoodie.datasource.write.operation", "upsert").
    option("hoodie.table.name", tableName).
    mode("append").
    save(basePath)

  val output2_df = spark.read.format("hudi").load(basePath)
  output2_df.show(truncate=false)

  spark.close()
}
suresh-hl commented 2 months ago

We recently enabled Clustering on 4 COW tables in Prod (Using AWS EMR 7.1.0, Hudi 0.14.1 setup) and faced this issue in all 4 of them. Later was able to reproduce it in local as well.

Error: Cannot encode decimal with precision 12 as max precision 11 (Full trace at the end)

All 4 Tables had a column with decimal(11,2) or decimal(13,2) data type, we were sure it's not due to source data as all 4 tables failed post enabling clustering with similar issue and no change done to source systems.

Issue is caused by replace commit's new parquet file written. Noticed that new parquet file had different datatype for these columns.

Old parquet decimal column metadata before clustering:

{
    "name" : "amount",
    "type" : [ "null", {
      "type" : "fixed",
      "name" : "fixed",
      "namespace" : "hoodie.hudi_faker_cow_decimal_issue.hudi_faker_cow_decimal_issue_record.amount",
      "size" : 5,
      "logicalType" : "decimal",
      "precision" : 11,
      "scale" : 2
    } ],
    "default" : null
}

New parquet decimal column metadata post clustering:

{
    "name" : "amount",
    "type" : [ "null", "long" ],
    "default" : null
}

To overcome we disabled clustering, took backup and deleted replace commit file in .hoodie (As we still have old parquet data files not cleaned up), then the job was able to run fine.

Reproducing Issue in local PySpark (Spark 3.4.1, Hudi 0.14.1):

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[4]") \
    .appName("hudi-cow-faker-app") \
    .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1,org.apache.hudi:hudi-common:0.14.1")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.hadoop.fs.s3a.impl.disable.cache", "true") \
    .getOrCreate()

# Faker Data Function

from faker import Faker
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType, DecimalType

def get_fake_df(num_rows, max_id=0):
    fake = Faker()

    schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("first_name", StringType(), False),
        StructField("last_name", StringType(), False),
        StructField("age", IntegerType(), False),
        StructField("address", StringType(), False),
        StructField("city", StringType(), False),
        StructField("some_amount", DecimalType(11,2), True), # Decimal col causing issue
        StructField("amount", DecimalType(11,2), True), # Decimal col causing issue
        StructField("created_at", TimestampType(), False),
        StructField("updated_at", TimestampType(), False)
    ])

    fake_data = [(i,
             fake.first_name(),
             fake.last_name(),
             fake.random_int(min=18, max=80),
             fake.address(),
             fake.city(),
             fake.pydecimal(left_digits=9, right_digits=2, positive=True),
             fake.pydecimal(left_digits=9, right_digits=2, positive=True),
             fake.date_time_between('-1y'),
             fake.date_time_this_year(before_now=True)
            )
        for i in range(max_id + 1, max_id + num_rows + 1)]

    print(fake_data[:5])

    fake_df = spark.createDataFrame(fake_data, schema)

    fake_df.printSchema()
    fake_df.show(5)

    return fake_df

tableName = "hudi_cow_decimal_issue"
basePath = f"s3a://data-lake-local/{tableName}/"

 hudi_options = {
    'hoodie.table.name': tableName,
    "hoodie.datasource.write.table.type": 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.precombine.field': 'updated_at',

    'hoodie.metadata.enable':'true',
    'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator',

    'hoodie.clustering.inline': 'true'
}

# Create table with some rows

fake_df = get_fake_df(100) # rows count 100

hudi_options['hoodie.datasource.write.operation']='bulk_insert'
fake_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)

# Insert additional 20 rows first run-- NO ISSUES here as clusterting runs first time.

from pyspark.sql.functions import max

df_read = spark.read.format("hudi").options(**hudi_options).load(basePath)
max_id = df_read.agg(max("id")).collect()[0][0]

fake_df = get_fake_df(20, max_id)

hudi_options['hoodie.datasource.write.operation']='upsert'
fake_df.write.format("hudi").options(**hudi_options).mode("append").save(basePath)

# Insert additional 20 rows second run -- FAILS

df_read = spark.read.format("hudi").options(**hudi_options).load(basePath)
max_id = df_read.agg(max("id")).collect()[0][0]

fake_df = get_fake_df(20, max_id)

hudi_options['hoodie.datasource.write.operation']='upsert'
fake_df.write.format("hudi").options(**hudi_options).mode("append").save(basePath)

ERROR SimpleExecutor: Failed consuming records
org.apache.avro.AvroTypeException: Cannot encode decimal with precision 13 as max precision 11
    at org.apache.avro.Conversions$DecimalConversion.validate(Conversions.java:140)
    at org.apache.avro.Conversions$DecimalConversion.toFixed(Conversions.java:104)
    at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1082)
    at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:949)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
    at org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
    at org.apache.hudi.common.model.HoodieRecord.rewriteRecordWithNewSchema(HoodieRecord.java:403)
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.lambda$runMerge$0(HoodieMergeHelper.java:136)
    at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:68)
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:387)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:369)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:257)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
24/09/04 19:25:12 ERROR BaseSparkCommitActionExecutor: Error upserting bucketType UPDATE for partition :0
danny0405 commented 2 months ago

Issue is caused by replace commit's new parquet file written. Noticed that new parquet file had different datatype for these columns.

It's weird the decimal switched to long type, did you do any schema evolution, not sure if it is a spark behavior because for clustering we have spark native writer though.

suresh-hl commented 2 months ago

Issue is caused by replace commit's new parquet file written. Noticed that new parquet file had different datatype for these columns.

It's weird the decimal switched to long type, did you do any schema evolution, not sure if it is a spark behavior because for clustering we have spark native writer though.

No schema evolution done, I've given the whole code above. Same datatype is inserted again, and clustering is causing some changes. And noticed this only for decimal type cols. If I try for table without decimal, didn't see such issues.

danny0405 commented 2 months ago

Got it, then we may got some issues for clustering, it looks like there is data type inconsistency introduced here.