apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Hudi Merge On Read Tables don't write Delta Log Files #9361

Open kepplertreet opened 1 year ago

kepplertreet commented 1 year ago

Hi.

I'm using a Spark Structured Streaming Application running on EMR-6.11.0 to Write into a Hudi MOR Table.

Hudi Version : 0.13.0 Spark Version : 3.3.2

'hoodie.table.name': <table_name>,
'hoodie.datasource.write.recordkey.field': <column_name> ,
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.SimpleKeyGenerator',
'hoodie.datasource.write.table.type': "MERGE_ON_READ",
'hoodie.datasource.write.partitionpath.field': <year_month>,
'hoodie.datasource.write.table.name': <table_name>,
'hoodie.datasource.write.precombine.field': <commit_time_ms>,
"hoodie.table.version": 5,
"hoodie.datasource.write.commitmeta.key.prefix": "_",
"hoodie.datasource.write.hive_style_partitioning": 'true',
"hoodie.datasource.meta.sync.enable": 'false',
"hoodie.datasource.hive_sync.enable": 'true',
"hoodie.datasource.hive_sync.auto_create_database": 'true',
"hoodie.datasource.hive_sync.skip_ro_suffix": 'true',
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.parquet.small.file.limit": 125217728,
"hoodie.parquet.max.file.size": 134217728,

# Compaction Configs 
"hoodie.compact.inline" : "false", 
"hoodie.compact.schedule.inline" : "false", 
"hoodie.datasource.compaction.async.enable": "true",
"hoodie.compact.inline.trigger.strategy": "NUM_COMMITS",
"hoodie.compact.inline.max.delta.commits": 3,

# --- Cleaner Configs ---- 
"hoodie.clean.automatic": 'true',
"hoodie.clean.async": 'true',
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.clean.trigger.strategy" : "NUM_COMMITS", 
"hoodie.clean.max.commits" : 7, 
"hoodie.cleaner.commits.retained" : 3, 
"hoodie.cleaner.fileversions.retained": 1, 
"hoodie.cleaner.hours.retained": 1, 
"hoodie.cleaner.policy": "KEEP_LATEST_COMMITS",

"hoodie.parquet.compression.codec": "snappy",
"hoodie.embed.timeline.server": 'true',
"hoodie.embed.timeline.server.async": 'false',
"hoodie.write.concurrency.mode": "OPTIMISTIC_CONCURRENCY_CONTROL",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
"hoodie.index.type": "BLOOM",
"hoodie.datasource.write.streaming.checkpoint.identifier" : <streaming_app_identifier>,

# Metadata Configs 
"hoodie.metadata.enable": 'true',
"hoodie.bloom.index.use.metadata": 'true',
"hoodie.metadata.index.async": 'false',
"hoodie.metadata.clean.async": 'true',
"hoodie.metadata.index.bloom.filter.enable": 'true',
"hoodie.metadata.index.column.stats.enable" : 'true', 
"hoodie.metadata.index.bloom.filter.column.list": <record_key_field>, 
"hoodie.metadata.index.column.stats.column.list" : <record_key_field>,
"hoodie.metadata.metrics.enable": 'true', 

"hoodie.keep.max.commits": 50,
"hoodie.archive.async": 'true',
"hoodie.archive.merge.enable": 'false',
"hoodie.archive.beyond.savepoint": 'true',
"hoodie.cleaner.policy": "KEEP_LATEST_BY_HOURS",
"hoodie.cleaner.hours.retained": 1

Issues Faced : As the configs show, we have OCC and Metadata enabled for the table. My only concern for now is that I never see log files being written into the main table and hence a compaction is never scheduled nor triggered for the main table i.e all incoming data is written directly into parquet files, whereas the metadata timeline show scheduling and execution of compaction and hence a commit is reflected into the timeline.

Is this a normal expected behaviour? Is hudi internally calculating the cost of carrying out a trade off between the cost of writing Log Files and then executing a compaction on them v/s directly writing the data to a parquet, and chooses to perform whichever turns out less expensive. Is their some defined threshold for ingress batches crossing which only makes Hudi Write Data into Log Files.

Thanks

ad1happy2go commented 1 year ago

@kepplertreet Do you only have inserts in your incremental. When using spark writer, for MOR tables insert goes to parquet files and updates goes to log files.

kepplertreet commented 1 year ago

@ad1happy2go Hi, no our tables are quite update heavy. In fact for a few tables only 10% of transactions are Inserts and the remaining transactions consist majorly of updates

ad1happy2go commented 1 year ago

@kepplertreet can you post your hoodie.properties file and timeline? It should ideally write lot of log files in that case if you dont see any compaction scheduled in the timeline.

kepplertreet commented 1 year ago

@ad1happy2go the configs are mentioned above. I'm passing them inline with the spark job. I'll attach a few screen grabs of the timeline

I hope this helps you understand better.

danny0405 commented 1 year ago

For BloomFilter index, only updates yields avro logs, that's by design because only the parquet records the BloomFilter.

kepplertreet commented 1 year ago

hey @danny0405 so does that mean our table transactions do not have updates on them or is it that our hudi configs are not able to percolate the updates to our downstream hudi table ?

One thing I'm sure about is that our tables do get regular update transcations.

danny0405 commented 1 year ago

One thing I'm sure about is that our tables do get regular update transcations.

If you setup the record key correctly then this might not what it is supposed to be, did you check out any duplicates for keys on the queries?

kepplertreet commented 1 year ago

Hi @danny0405 I can see the updates happening on the table but I never see a delta log file being written while the metadata table for the same table undergoes compactions without any problem.
Also there are no duplicates for record key as of now. All updates are correctly being reflected to the coressponding record key on the basis of their precombine field.

My problem is just the absence of LogFiles during a delta commit as the delta commit writes an entire parquet and hence no compactions can be noticed.

Also how does Hudi Handle duplicate values for a write batch for precombine field for the same record key ?

Here I also attached a few screen grabs

danny0405 commented 1 year ago

I guess it is because Hudi would do an in-memory decuplication for a new batch of inputs before actual data flushing. If you only have updates that exist per in-memory batch, then after deduplcation, all the keys are merged and the merged record is actually deemed as an INSERT for Hudi.