apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.42k stars 2.43k forks source link

[SUPPORT] Data loss while ingesting multiple hudi tables via one glue/spark job with clustering and metadata properties #8644

Open d4r3topk opened 1 year ago

d4r3topk commented 1 year ago

Describe the problem you faced

We have shifted to Hudi 0.13.0 on Spark 3.3 and using it as an external library JAR for ingestion of datasets using AWS Glue streaming jobs. We are noticing atleast 10-13% data loss in every partition (partitioned by region=x/year=xxxx/month=xx/day=xx/hour=xx .

The job that ingests this dataset leading to such data loss splits an incoming dynamic frame based on a field and then creates/updates a different hudi table based on each field. (Maximum tables - 3 via 1 job)

We have another job that does not do this splitting and just ingests one table with the incoming dynamic frame. This job has no data loss. This job also has metadata and clustering async enabled.

We have jobs running with the following parameters - init_load_config = {"hoodie.datasource.write.operation": "bulk_insert"}

partition_data_config = { "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.write.partitionpath.field": "region,year,month,day,hour", "hoodie.datasource.hive_sync.partition_fields": "region,year,month,day,hour", }

init_common_config = { "className": "org.apache.hudi", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.write.reconcile.schema": "true", "write.parquet.max.file.size": 268435456, # 256 MB = 268435456 Bytes "hoodie.parquet.small.file.limit": 209715200, # 200 MB = 209715200 Bytes "hoodie.datasource.write.hive_style_partitioning": "true", }

metadata_config = { "hoodie.archive.async": "true", "hoodie.archive.merge.enable": "true", "hoodie.datasource.hive_sync.support_timestamp": "true", "clustering.plan.partition.filter.mode": "RECENT_DAYS", "clustering.plan.strategy.daybased.lookback.partitions": 2 }

plug_config = {
"hoodie.datasource.write.precombine.field": "ApproximateArrivalTimestamp",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.hive_sync.database": "db_name",
"hoodie.table.name": "table_name",
"hoodie.datasource.hive_sync.table": "table_name",
"hoodie.copyonwrite.record.size.estimate": 2000,
"hoodie.clustering.async": "false",
"hoodie.metadata.enable": "false"
}

When we first noticed data loss, it was with the clustering and metadata enabled. However, after reading the regression of metadata and timeline server for streaming jobs (w/ COW), we disabled it and created a completely new job with new destination and new table names to validate no data loss but we're still losing data.

More context -

To Reproduce

Steps to reproduce the behavior:

I have not tested the following process more than 2 times and for the same dataset.

  1. Create a glue dynamic frame with 5 fields in a Glue streaming job with Glue 4 reading data from kinesis streams
  2. Split the dynamic frame based on any one field
  3. Iterate through the multiple dynamic frames and ingest into different hudi tables to S3 with different table names

Expected behavior

There should be no data loss and counts should exactly match. Incase any data loss occurs, the logs should be shown to indicate data loss and potential reason.

Environment Description

Additional context

Here are the counts for multiple partitions with different parameters and hudi/non-hudi. We have validated that all non-hudi entries below are exact matches from the source. All 3 of these different types of jobs are running with the exact same processing and writing logic. The only parameters we update are metadata and clustering and disabling hudi.

The source is a kinesis stream

0 type day hour pk_count 1 hudi-metadata-and-clustering-enabled 04 00 362 2 hudi-no-metadata-no-clustering-enabled 04 00 366 3 non-hudi 04 00 366 4 hudi-metadata-and-clustering-enabled 04 01 262 5 hudi-no-metadata-no-clustering-enabled 04 01 367 6 non-hudi 04 01 367 7 hudi-metadata-and-clustering-enabled 04 02 252 8 hudi-no-metadata-no-clustering-enabled 04 02 353 9 non-hudi 04 02 365 10 hudi-metadata-and-clustering-enabled 04 03 262 11 hudi-no-metadata-no-clustering-enabled 04 03 360 12 non-hudi 04 03 366 13 hudi-metadata-and-clustering-enabled 04 04 248 14 hudi-no-metadata-no-clustering-enabled 04 04 354 15 non-hudi 04 04 366 16 hudi-metadata-and-clustering-enabled 04 05 252 17 hudi-no-metadata-no-clustering-enabled 04 05 366 18 non-hudi 04 05 366 19 hudi-metadata-and-clustering-enabled 04 06 251 20 hudi-no-metadata-no-clustering-enabled 04 06 365 21 non-hudi 04 06 365 22 hudi-metadata-and-clustering-enabled 04 07 256 23 hudi-no-metadata-no-clustering-enabled 04 07 368 24 non-hudi 04 07 368 25 hudi-metadata-and-clustering-enabled 04 08 251 26 hudi-no-metadata-no-clustering-enabled 04 08 360 27 non-hudi 04 08 366 28 hudi-metadata-and-clustering-enabled 04 09 253 29 hudi-no-metadata-no-clustering-enabled 04 09 359 30 non-hudi 04 09 365 31 hudi-metadata-and-clustering-enabled 04 10 258 32 hudi-no-metadata-no-clustering-enabled 04 10 366 33 non-hudi 04 10 366 34 hudi-metadata-and-clustering-enabled 04 11 258 35 hudi-no-metadata-no-clustering-enabled 04 11 366 36 non-hudi 04 11 366 37 hudi-metadata-and-clustering-enabled 04 12 260 38 hudi-no-metadata-no-clustering-enabled 04 12 360 39 non-hudi 04 12 366 40 hudi-metadata-and-clustering-enabled 04 13 252 41 hudi-no-metadata-no-clustering-enabled 04 13 364 42 non-hudi 04 13 364 43 hudi-metadata-and-clustering-enabled 04 14 253 44 hudi-no-metadata-no-clustering-enabled 04 14 366 45 non-hudi 04 14 366 46 hudi-metadata-and-clustering-enabled 04 15 254 47 hudi-no-metadata-no-clustering-enabled 04 15 361 48 non-hudi 04 15 367 49 hudi-metadata-and-clustering-enabled 04 16 254 50 hudi-no-metadata-no-clustering-enabled 04 16 365 51 non-hudi 04 16 365 52 hudi-metadata-and-clustering-enabled 04 17 250 53 hudi-no-metadata-no-clustering-enabled 04 17 314 54 non-hudi 04 17 366 55 hudi-metadata-and-clustering-enabled 04 18 250 56 hudi-no-metadata-no-clustering-enabled 04 18 360 57 non-hudi 04 18 367 58 hudi-metadata-and-clustering-enabled 04 19 253 59 hudi-no-metadata-no-clustering-enabled 04 19 318 60 non-hudi 04 19 365 61 hudi-metadata-and-clustering-enabled 04 20 257 62 hudi-no-metadata-no-clustering-enabled 04 20 364 63 non-hudi 04 20 364 64 hudi-metadata-and-clustering-enabled 04 21 252 65 hudi-no-metadata-no-clustering-enabled 04 21 364 66 non-hudi 04 21 364 67 hudi-metadata-and-clustering-enabled 04 22 266 68 hudi-no-metadata-no-clustering-enabled 04 22 364 69 non-hudi 04 22 371 70 hudi-metadata-and-clustering-enabled 04 23 256 71 hudi-no-metadata-no-clustering-enabled 04 23 368 72 non-hudi 04 23 368

Experiment: Run 3 glue jobs instead of 1 therefore avoiding writes to multiple tables via 1 job. Each job reads from the same kinesis stream and then filters for a field==val1 and creates new DynamicFrame. We then ingest this via hudi with clustering and metadata disabled. Similarly the other 2 jobs will use field==val2 and field==val3. Observation: No data loss. Exact count matches for each table.

Stacktrace

No stacktrace as such.

ad1happy2go commented 1 year ago

@d4r3topk If I have understood it correctly, You are only losing data when you are trying to insert into 3 hudi tables using a single job.

Are you using separate checkpoint directory for all the 3 output hudi tables you are populating? Can you post that code snippet where you are splitting dynamic frame and inserting to hudi tables.

ad1happy2go commented 1 year ago

@d4r3topk Putting your comment on ticket for tracking -

""" Looks like this is a AWS Glue issue with kinesis which is regarding the way that every hudi write in a microbatch require pulling of kinesis records and the inconsistency of those records across writes in a microbatch causes data loss. Validated this with a custom jar fix. """