apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.2k stars 2.38k forks source link

[BUG] Spark will read invalid timestamp(3) data when record in log is older than the same in parquet. #10012

Open seekforshell opened 8 months ago

seekforshell commented 8 months ago

Describe the problem you faced

Spark read invalid timestamp(3) data when record in log is older than the same in parquet.

To Reproduce

  1. create a mor table with timestamp(3) type. eg. CREATE EXTERNAL TABLE xxx.bucket_mor_t2( _hoodie_commit_time string COMMENT '',
    _hoodie_commit_seqno string COMMENT '',
    _hoodie_record_key string COMMENT '',
    _hoodie_partition_path string COMMENT '',
    _hoodie_file_name string COMMENT '',
    source_from int COMMENT '',
    id bigint COMMENT '',
    name string COMMENT '',
    create_time timestamp COMMENT '',
    price decimal(14,2) COMMENT '',
    extend string COMMENT '',
    count bigint COMMENT '',
    create_date date COMMENT '',
    ext_dt timestamp COMMENT '',
    precombine_field string COMMENT '',
    sync_deleted int COMMENT '',
    sync_time timestamp COMMENT '',
    __binlog_file string COMMENT '',
    __pos int COMMENT '',
    source_sys int COMMENT '')
    PARTITIONED BY (
    __partition_field int COMMENT '')
    ROW FORMAT SERDE
    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    WITH SERDEPROPERTIES (
    'hoodie.query.as.ro.table'='false',
    'path'='hdfs://NameNodeService1/xxx/xxx/bucket_mor_t2')
    STORED AS INPUTFORMAT
    'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION
    'hdfs://NameNodeService1/xxx/xxx/bucket_mor_t2' TBLPROPERTIES (
    'connector'='hudi',
    'hoodie.datasource.write.recordkey.field'='source_from,id',
    'last_commit_time_sync'='20231106172508127',
    'path'='hdfs://NameNodeService1/xxx/xxx/bucket_mor_t2',
    'spark.sql.sources.provider'='hudi',
    'spark.sql.sources.schema.numPartCols'='1',
    'spark.sql.sources.schema.numParts'='1',
    'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"source_from","type":"integer","nullable":true,"metadata":{}},{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"create_time","type":"timestamp","nullable":true,"metadata":{}},{"name":"price","type":"decimal(14,2)","nullable":true,"metadata":{}},{"name":"extend","type":"string","nullable":true,"metadata":{}},{"name":"count","type":"long","nullable":true,"metadata":{}},{"name":"create_date","type":"date","nullable":true,"metadata":{}},{"name":"ext_dt","type":"timestamp","nullable":true,"metadata":{}},{"name":"precombine_field","type":"string","nullable":true,"metadata":{}},{"name":"sync_deleted","type":"integer","nullable":true,"metadata":{}},{"name":"sync_time","type":"timestamp","nullable":true,"metadata":{}},{"name":"binlog_file","type":"string","nullable":true,"metadata":{}},{"name":"__pos","type":"integer","nullable":true,"metadata":{}},{"name":"source_sys","type":"integer","nullable":true,"metadata":{}},{"name":"partition_field","type":"integer","nullable":true,"metadata":{}}]}',
    'spark.sql.sources.schema.partCol.0'='__partition_field',
    'table.type'='MERGE_ON_READ',
    'transient_lastDdlTime'='1692251328')

  2. insert new data into parquet with flink engine. eg. insert a record(id=1) with precombine value = 000001308800000028038927500000

  3. mock binlog(same record in step2) with precombine value = 1 (which is smaller than before) and commit but don't do compaction

finally, read record(id=1) in snapthot mode with spark sql. invalid data will occur:

b6c3e286dd36ef29f47f6ec569983e82

Expected behavior

when read field with timestamp(3), it will occur invalid data.

Environment Description

Additional context here is some debug info: create_time in parquet is micros 1699411557224_E98295A7-92FC-45c1-996E-E88960A58F1F

41683b8bd67a4bd2e139fad1d11c7594 avro log is millis 0e9e436e5ebe71463d39c7a50c290f30

Stacktrace

danny0405 commented 8 months ago

Did you check your table creation schema persisted in hoodie.properties about the timestamp precision represented as avro format?

seekforshell commented 8 months ago

Did you check your table creation schema persisted in hoodie.properties about the timestamp precision represented as avro format?

yes,here it is: `#Properties saved on 2023-11-06T09:16:47.982Z

Mon Nov 06 17:16:47 CST 2023

hoodie.table.precombine.field=precombine_field hoodie.datasource.write.drop.partition.columns=false hoodie.table.partition.fields=partition_field hoodie.table.type=MERGE_ON_READ hoodie.archivelog.folder=archived hoodie.compaction.payload.class=org.apache.hudi.common.model.EventTimeAvroPayload hoodie.timeline.layout.version=1 hoodie.table.version=5 hoodie.table.recordkey.fields=source_from,id hoodie.datasource.write.partitionpath.urlencode=false hoodie.table.name=air08_airflow_bucket_mor_t2 hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator hoodie.datasource.write.hive_style_partitioning=false hoodie.table.create.schema={"type"\:"record","name"\:"record","fields"\:[{"name"\:"source_from","type"\:["null","int"],"default"\:null},{"name"\:"id","type"\:["null","long"],"default"\:null},{"name"\:"name","type"\:["null","string"],"default"\:null},{"name"\:"create_time","type"\:["null",{"type"\:"long","logicalType"\:"timestamp-millis"}],"default"\:null},{"name"\:"price","type"\:["null",{"type"\:"fixed","name"\:"fixed","namespace"\:"record.price","size"\:6,"logicalType"\:"decimal","precision"\:14,"scale"\:2}],"default"\:null},{"name"\:"extend","type"\:["null","string"],"default"\:null},{"name"\:"count","type"\:["null","long"],"default"\:null},{"name"\:"create_date","type"\:["null",{"type"\:"int","logicalType"\:"date"}],"default"\:null},{"name"\:"ext_dt","type"\:["null",{"type"\:"long","logicalType"\:"timestamp-millis"}],"default"\:null},{"name"\:"precombine_field","type"\:["null","string"],"default"\:null},{"name"\:"sync_deleted","type"\:["null","int"],"default"\:null},{"name"\:"sync_time","type"\:["null",{"type"\:"long","logicalType"\:"timestamp-millis"}],"default"\:null},{"name"\:"__binlog_file","type"\:["null","string"],"default"\:null},{"name"\:"pos","type"\:["null","int"],"default"\:null},{"name"\:"source_sys","type"\:["null","int"],"default"\:null},{"name"\:"__partition_field","type"\:["null","int"],"default"\:null}]} hoodie.table.checksum=3920591838 `

danny0405 commented 8 months ago

Looks like the avro schema is using timestamp-millis as the logical data type, so in EventTimeAvroPayload, did you debug a little bit where the timestamp-micros come from?

seekforshell commented 8 months ago

Looks like the avro schema is using timestamp-millis as the logical data type, so in EventTimeAvroPayload, did you debug a little bit where the timestamp-micros come from?

it come from serializer and serializer use baseFileReaderAvroSchema to create converter. image

and in this code, SchemaConverters will translate TimestampType as TimestampMicros default in avro. image

danny0405 commented 8 months ago

Intreasting, the question is why baseFileReaderAvroSchema has the wrong schema set up, I guess it comes from the write schema actually.