apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

[SUPPORT] Additional records in dataset after clustering #10172

Closed noahtaite closed 2 weeks ago

noahtaite commented 10 months ago

Describe the problem you faced

We generated a medium-sized MoR table using bulk_insert with the following dimensions:

Since we have many small files due to bulk_insert not automatically handling file sizing, we need to run clustering on the table to improve downstream read performance.

After running clustering and counting the data for a single partition, the count has grown from 177,822,668 to 177,828,417 (a count difference of ~6k records). When I run an except() between the clustered and control dataset, it outputs 3,127,201 records.

I am trying to understand why there is a difference in count after running clustering and why there are 3M supposedly different records even though I have not changed the following default configuration:

hoodie.clustering.preserve.commit.metadata
When rewriting data, preserves existing hoodie_commit_time
Default Value: true (Optional)
Config Param: PRESERVE_COMMIT_METADATA
Since Version: 0.9.0

To Reproduce

Steps to reproduce the behavior:

  1. Generate MoR table and update many times without running compaction.
  2. Run a snapshot query against the table to get the count.
  3. Run clustering on the table.
  4. Run another snapshot query.
  5. Observe the count has changed

Expected behavior

I expected the count to remain the same. I did see new files created from the clustering process.

Environment Description

Additional context

Stacktrace

N/A

noahtaite commented 10 months ago

Wondering if it is related to this comment from @nsivabalan https://github.com/apache/hudi/issues/5777#issuecomment-1524667071

"If you are using any of the global indices (global_bloom or global_simple) and if records migrated from one partition to another, until compaction kicks in, you may see duplicates w/ Read optimized query. but once compaction completes for the older file group of interest, you may not see duplicates."

"...Also, if the index is global one, if records migrate from 1 partition to another and again to another before compaction can kick in, you may see duplicates even w/ RT query. this may not go away even w/ compaction."

Since clustering is just reading the base parquet files, is it possible to see duplicates if records have moved from one partition to another (in our case, updated records that come in as deleted from our source system, which moves them to the __HIVE_DEFAULT_PARTITION__)?

This may explain the post-compaction results shown below being much closer than the original issue presented?

noahtaite commented 10 months ago

I ran compaction on this table, following up with clustering. The results have changed as followed: Pre-compaction, pre-clustering: 177,822,668 Post-compaction, pre-clustering: 177,822,668 Post-compaction, post-clustering: 177,822,812 (144 more records)

noahtaite commented 10 months ago

I have verified that the "additional" records are those that have moved to __HIVE_DEFAULT_PARTITION__ after we applied a soft delete from our incoming DMS records.

After clustering, the snapshot shows these records as 'live' and I can see duplicates records output in my snapshot query:

## CONTROL QUERY

val ctrl = spark.read.format("hudi").load("s3://bucket/path/control_table.all_hudi/")

ctrl.filter(col("datasource").equalTo("datasource1")).filter(col("uuid").equalTo(ID_VALUE)).
select("_hoodie_commit_time","uuid","Op","CaptureDate","totalvalue").
show(10,false)

1 record:
_hoodie_commit_time - 20231019024142816 
uuid -  <UUID value>
Op - D
CaptureDate -  2023-10-17 16:08:35.512961
totalvalue - null 

-> Lives in the "datasource=datasource1/year=__HIVE_DEFAULT_PARTITION_/month=__HIVE_DEFAULT_PARTITION_" partition.

## TEST QUERY

val test = spark.read.format("hudi").load("s3://bucket/path/test_table.all_hudi/")

test.filter(col("datasource").equalTo("datasource1")).filter(col("uuid").equalTo(ID_VALUE)).
select("_hoodie_commit_time","uuid","Op","CaptureDate","totalvalue").
show(10,false)

2 records:
_hoodie_commit_time - 20231015024207511 
uuid - <UUID value>
Op - I 
CaptureDate - 2023-10-14 13:43:56.588896 <-- We can see that this record is invalid and a snapshot query should NOT return this result!
totalvalue - -629.53 
-> Lives in the "datasource=datasource1/year=2023/month=10" partition.

_hoodie_commit_time - 20231019024142816
uuid - <UUID value>
Op - D
CaptureDate - 2023-10-17 16:08:35.512961
totalvalue - null
-> Lives in the "datasource=datasource1/year=__HIVE_DEFAULT_PARTITION_/month=__HIVE_DEFAULT_PARTITION_" partition.

I hope I have defined my experiments and the behaviour quite well.

From what I can gather, if records have moved partitions when using a global index, those records NEED to be compacted before clustering can be run correctly. Otherwise there is a data integrity issue at hand. Is there a strategy I can use to enforce all pending log files get compacted in a single run?

@nsivabalan - I hate to ping you on issues but I have seen you document the intricacies of compaction on Hudi's website as well as on Medium. I hope you can shed some light onto this potential pitfall with how clustering + compaction play together?

noahtaite commented 10 months ago

Bump... I think data inconsistency after clustering should be treated as a critical priority investigation

ad1happy2go commented 9 months ago

@noahtaite Sorry for the delay here. We will look into it soon. Did you tried except() after dropping the Hoodie Meta columns?

ad1happy2go commented 9 months ago

@noahtaite Can you also post complete table and writer configs.

noahtaite commented 9 months ago

Hi @ad1happy2go , apologies for the delayed response due to holidays. I will update this post with complete configs shortly.

nsivabalan commented 5 months ago

hey @noahtaite : any follow ups on this.

noahtaite commented 4 months ago

Apologies again for the delay, we shelved clustering after this experiment and re-generated our lake with proper file sizing.

Since this issue could affect others, I'll share my configs that got us there: Upsert config:

{
  "hoodie.datasource.hive_sync.database": "db",
  "hoodie.global.simple.index.parallelism": "1920",
  "hoodie.datasource.hive_sync.mode": "hms",
  "hoodie.datasource.hive_sync.support_timestamp": "true",
  "hoodie.schema.on.read.enable": "false",
  "path": "s3://bucket/table.all_hudi",
  "hoodie.datasource.write.precombine.field": "CaptureDate",
  "hoodie.datasource.hive_sync.partition_fields": "datasource,year,month",
  "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
  "hoodie.datasource.hive_sync.use_jdbc": "false",
  "hoodie.meta.sync.metadata_file_listing": "true",
  "hoodie.cleaner.parallelism": "1920",
  "hoodie.datasource.meta.sync.enable": "true",
  "hoodie.datasource.hive_sync.skip_ro_suffix": "true",
  "hoodie.metadata.enable": "true",
  "hoodie.datasource.hive_sync.table": "table_all",
  "hoodie.datasource.meta_sync.condition.sync": "true",
  "hoodie.index.type": "GLOBAL_BLOOM",
  "hoodie.clean.automatic": "true",
  "hoodie.datasource.write.operation": "upsert",
  "hoodie.datasource.hive_sync.enable": "true",
  "hoodie.datasource.write.recordkey.field": "uuid",
  "hoodie.table.name": "table_all",
  "hoodie.write.lock.dynamodb.billing_mode": "PAY_PER_REQUEST",
  "hoodie.datasource.write.table.type": "MERGE_ON_READ",
  "hoodie.datasource.write.hive_style_partitioning": "true",
  "hoodie.write.lock.dynamodb.endpoint_url": "*********(redacted)",
  "hoodie.write.lock.dynamodb.partition_key": "table_all",
  "hoodie.cleaner.policy": "KEEP_LATEST_COMMITS",
  "hoodie.write.concurrency.early.conflict.detection.enable": "true",
  "hoodie.compact.inline": "true",
  "hoodie.datasource.write.reconcile.schema": "true",
  "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
  "hoodie.cleaner.policy.failed.writes": "LAZY",
  "hoodie.keep.max.commits": "110",
  "hoodie.upsert.shuffle.parallelism": "1920",
  "hoodie.meta.sync.client.tool.class": "org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool",
  "hoodie.cleaner.commits.retained": "90",
  "hoodie.write.lock.dynamodb.table": "hudi-lock-provider",
  "hoodie.write.lock.provider": "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
  "hoodie.keep.min.commits": "100",
  "hoodie.datasource.write.partitionpath.field": "datasource,year,month",
  "hoodie.write.concurrency.mode": "OPTIMISTIC_CONCURRENCY_CONTROL",
  "hoodie.write.lock.dynamodb.region": "us-east-1"
}

Clustering properties:

hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=1
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.target.file.max.bytes=524288000
hoodie.clustering.plan.strategy.small.file.limit=10485760
hoodie.clustering.preserve.commit.metadata=true

Clustering job: spark-submit --class org.apache.hudi.utilities.HoodieClusteringJob /usr/lib/hudi/hudi-utilities-bundle.jar --props s3://bucket/properties/nt.clustering.properties --mode scheduleAndExecute --base-path s3://bucket/table.all_hudi/ --table-name table_all --spark-memory 90g --parallelism 1000

From what I could gather, it appears that applying soft deletes moves records to __HIVE_DEFAULT_PARTITION__, and when using a global index the old version of those records could still be visible in a snapshot query until compaction is ran. I observed this in Hudi 0.12.1 (AWS EMR 6.9.0). I don't currently have the bandwidth to experiment with this in our latest stable Hudi 0.13.1 (AWS EMR 6.12.0) job.

Thanks again for all your help.

nsivabalan commented 2 weeks ago

got it. yeah. for soft deletes, you could probably just nullify all cols except partition path. thanks for updating.