apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

CDC data_before_after mode does not convert Spark DecimalType correctly #8616

Open BCriswell opened 1 year ago

BCriswell commented 1 year ago

I've noticed an issue with the data_before_after CDC mode not converting Spark DecimalType correctly. The decimals are getting converted to an array in the before and after json strings when the cdc data is saved, which then results in null values when trying to convert back to a Row using F.from_json() along with the original schema because Spark can't cast the array to a valid DecimalType. Example:

Querying the hudi table normally: gljeln=Decimal('208.000000000000000000')

Querying using the cdc format + incremental options: Row(op='i', ts_ms='20230425193451991', before='null', after='{"gljeln": [0, 0, 0, 0, 0, 0, 0, 11, 70, -108, 113, -8, 1, 64, 0, 0]...

Steps to reproduce the behavior:

  1. Create a Spark dataframe that contains a decimal column (precision=38, scale=18) with valid decimal value.
  2. Write dataframe to new Hudi table using write options for enabling CDC.
  3. Create a second dataframe that contains a decimal column (precision=38, scale=18) with valid decimal value.
  4. Write dataframe to same Hudi table using same write options, which will create the first .cdc file.
  5. Read CDC data using cdc format for incremental query starting at the first commit time (which will only read the newly created cdc data).
  6. The result is a single insert entry, and the decimal value in the "after" JSON string will be an array of numbers instead of the string representation of the decimal.

Expected behavior

The decimal value should be serialized to an appropriate type (probably a string) that can be deserialized without corrupting the data.

Environment Description

Additional context

Example script to reproduce, and results:

from datetime import datetime
from decimal import Decimal
from pyspark.sql import SparkSession, types as T

HUDI_TARGET = 's3://some-bucket'

def decimal_test():
    spark = (
        SparkSession
        .builder
        .appName('decimal_test')
        .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
        .config('spark.sql.hive.convertMetastoreParquet', 'false')
        .config('spark.hadoop.mapreduce.input.pathFilter.class', 'org.apache.hudi.hadoop.HoodieROTablePathFilter')
        .config('spark.sql.parquet.mergeSchema', 'true')
        .config('spark.sql.files.ignoreMissingFiles', 'true')
        .config('spark.sql.adaptive.enabled', 'true')
        .config('spark.sql.sources.partitionOverwriteMode', 'dynamic')
        .config('spark.sql.sources.partitionColumnTypeInference.enabled', 'false')
        .getOrCreate()
    )
    write_options = {
        'hoodie.bloom.index.bucketized.checking': False,
        'hoodie.bloom.index.input.storage.level': 'MEMORY_AND_DISK',
        'hoodie.bloom.index.prune.by.ranges': False,
        'hoodie.bulkinsert.shuffle.parallelism': 50,
        'hoodie.bulkinsert.sort.mode': 'PARTITION_SORT',
        'hoodie.combine.before.insert': False,
        'hoodie.combine.before.upsert': False,
        'hoodie.datasource.write.insert.drop.duplicates': False,
        'hoodie.datasource.write.operation': 'UPSERT',
        'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
        'hoodie.datasource.write.precombine.field': 'ts',
        'hoodie.datasource.write.recordkey.field': 'id',
        'hoodie.datasource.write.row.writer.enable': True,
        'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
        'hoodie.finalize.write.parallelism': 50,
        'hoodie.index.type': 'SIMPLE',
        'hoodie.insert.shuffle.parallelism': 50,
        'hoodie.metadata.enable': True,
        'hoodie.payload.ordering.field': 'ts',
        'hoodie.simple.index.input.storage.level': 'MEMORY_AND_DISK',
        'hoodie.table.name': 'some_table',
        'hoodie.upsert.shuffle.parallelism': 50,
        'hoodie.write.status.storage.level': 'MEMORY_AND_DISK',
        'hoodie.table.cdc.enabled': True,
        'hoodie.datasource.query.incremental.format': 'data_before_after',
        'hoodie.datasource.write.partitionpath.field': '',
        'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
    }
    schema = T.StructType([
        T.StructField('id', T.IntegerType()),
        T.StructField('amount', T.DecimalType(38, 18)),
        T.StructField('ts', T.TimestampType())
    ])
    df1 = spark.createDataFrame([(1, Decimal('42.000000000000000000'), datetime.now())], schema)
    df2 = spark.createDataFrame([(2, Decimal('2319.000000000000000000'), datetime.now())], schema)
    df1.write.format('hudi').options(**write_options).save(HUDI_TARGET, mode='append')
    df1 = spark.read.format('hudi').load(HUDI_TARGET)
    assert df1.first().amount == Decimal('42.000000000000000000')
    df2.write.format('hudi').options(**write_options).save(HUDI_TARGET, mode='append')

if __name__ == '__main__':
    decimal_test()

Incremental CDC query:

incremental_read_options = {
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.begin.instanttime': commit_time,
    'hoodie.datasource.query.incremental.format': 'cdc'
}

latest = spark.read.format('hudi').options(**incremental_read_options).load(s3_path)

And the result:

Row(op='i', ts_ms='20230428175640557', before='null', after='{"id": 2, "amount": [0, 0, 0, 0, 0, 0, 0, 125, -74, -105, 5, 105, 82, -36, 0, 0], "ts": 1682704564405033}')
danny0405 commented 1 year ago

Guess it is translated into a bytes array with fixed precicion and scale of Avro: https://avro.apache.org/docs/1.10.2/spec.html#schema_complex, see the fixed type part.

phamvinh1712 commented 3 months ago

hi @danny0405 , is there any news on this issue or any plan to solve this?

We're planning to use CDC format to handle some complex incremental processing use cases like presented in this blog https://www.onehouse.ai/blog/getting-started-incrementally-process-data-with-apache-hudi. However, with decimal values not returned correctly, we couldn't make use of CDC format.

danny0405 commented 3 months ago

sure, @phamvinh1712 would you mind to fire a fix, it might be a minor fix for avro and JSON type conversion I guess.

phamvinh1712 commented 3 months ago

@danny0405 yep, let me take this up some time next week. i just found where the issue is.

danny0405 commented 3 months ago

@phamvinh1712 Thanks so much, I would be gald to review the PR.