apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.2k stars 2.38k forks source link

[SUPPORT] Hudi offline compaction ignores old data #10863

Open ennox108 opened 3 months ago

ennox108 commented 3 months ago

I am trying to run a Flink job to get data from SQL server to S3.

I am doing offline compaction but whenever it is triggered I end up having less records than before the compaction. Based on the commits it looks like it is ignoring data in the old parquet files.

The compaction is triggered using /bin/flink run-application -t yarn-application -Dyarn.application.name=CompactionCas -Dyarn.application.queue=casualty -Djobmanager.memory.process.size=16384m -Dtaskmanager.memory.process.size=16384m -Dtaskmanager.memory.managed.fraction=0.05 -Dtaskmanager.memory.task.off-heap.size=512m -Dtaskmanager.memory.framework.off-heap.size=512m -c org.apache.hudi.sink.compact.HoodieFlinkCompactor /lib/hudi-flink1.17-bundle-0.13.1-amzn-0.jar --path s3:///data/casualty/raw/table-name --compaction-max-memory 2048

Here are the configs I use

'connector' = 'hudi'," 'write.tasks' = '" + loadTasks + "'," 'path' = '" + sinkLocation + "'," 'hoodie.fs.atomic_creation.support' = 's3'," 'table.type' = 'MERGE_ON_READ'," 'write.rate.limit' = '0'," 'precombine.field' = 'lsn'," 'metadata.enabled' = 'true'," 'index.type' = 'BUCKET'," 'hoodie.bucket.index.hash.field' = '" + indexField + "'," 'hoodie.bucket.index.num.buckets' = '" + indexBucketNum + "'," 'hoodie.database.name' = '" + dbName + "'," 'hoodie.table.name' = '" + tableName + "'," 'hoodie.datasource.write.hive_style_partitioning' = 'false'," 'hive_sync.support_timestamp' = 'true'," 'hive_sync.enabled' = 'true'," 'hive_sync.mode' = 'hms'," 'hive_sync.metastore.uris' = '" + hiveMetaURI + "'," 'hive_sync.db' = '" + dbName + "'," 'hive_sync.table' = '" + tableName + "'," 'hoodie.embed.timeline.server' = 'false'," 'compaction.schedule.enabled' = 'true'," 'compaction.async.enabled' = 'false'," 'compaction.trigger.strategy' = 'num_commits'," 'compaction.delta_commits' = '1'," 'clean.retain_commits' = '5'," 'archive.max_commits' = '15'," 'archive.min_commits' = '10')"

Spark - 3.4.0 Flink - 1.17.0 Hive - 3.1.3 EMR - 6.12.0

danny0405 commented 3 months ago

Did you use upsert as the operation name or just insert ?

nrlm1 commented 3 months ago

Thank you for looking into this issue. We are not specifying any. Expectation is to use the default "upsert".

danny0405 commented 3 months ago

You are using streaming ingestion or batch?

ennox108 commented 3 months ago

its streaming ingestion

danny0405 commented 3 months ago

So you are using the offline compaction because the online async compaction is disabled: compaction.async.enabled' = 'false',". Did you check the compaction plan to see whether the files included are expected?

BTW, you specify a compaction for each commit, so why not use the COW table then?