apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT]After compacting, there are a large number of logs with size 0, and they can never be cleared. #11007

Closed MrAladdin closed 3 months ago

MrAladdin commented 3 months ago

Describe the problem you faced 1、spark structured streaming : upsert mor (record_index) 2、After compacting, there are a large number of logs with size 0, and they can never be cleared.

Please help me check again if the configuration is correct, whether there are any conflicting configuration items, and these parameters have already overwhelmed me.

Environment Description

Additional context .writeStream .format("hudi") .option("hoodie.table.base.file.format", "PARQUET") .option("hoodie.allow.empty.commit", "true") .option("hoodie.datasource.write.drop.partition.columns","false") .option("hoodie.table.services.enabled", "true") .option("hoodie.datasource.write.streaming.checkpoint.identifier", "lakehouse-dwd-social-kbi-beauty-v1-writer-1") .option(PRECOMBINE_FIELD.key(), "date_kbiUdate") .option(RECORDKEY_FIELD.key(), "records_key") .option(PARTITIONPATH_FIELD.key(), "partition_index_date") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) .option("hoodie.combine.before.upsert", "true") .option("hoodie.datasource.write.payload.class","org.apache.hudi.common.model.OverwriteWithLatestAvroPayload")

      //markers
      .option("hoodie.write.markers.type", "DIRECT")

      //timeline server
      .option("hoodie.embed.timeline.server", "true")

      //File System View Storage Configurations
      .option("hoodie.filesystem.view.remote.timeout.secs", "1200")
      .option("hoodie.filesystem.view.remote.retry.enable", "true")
      .option("hoodie.filesystem.view.remote.retry.initial_interval_ms", "500")
      .option("hoodie.filesystem.view.remote.retry.max_numbers", "15")
      .option("hoodie.filesystem.view.remote.retry.max_interval_ms", "8000")

      //schema cache
      .option("hoodie.schema.cache.enable", "true")

      //spark write
      .option("hoodie.datasource.write.streaming.ignore.failed.batch", "false")
      .option("hoodie.datasource.write.streaming.retry.count", "6")
      .option("hoodie.datasource.write.streaming.retry.interval.ms", "3000")

      //metadata
      .option("hoodie.metadata.enable", "true")
      .option("hoodie.metadata.index.async", "false")
      .option("hoodie.metadata.index.check.timeout.seconds", "900")
      .option("hoodie.auto.adjust.lock.configs", "true")
      .option("hoodie.metadata.optimized.log.blocks.scan.enable", "true")
      .option("hoodie.metadata.index.column.stats.enable", "false")
      .option("hoodie.metadata.index.column.stats.parallelism", "100")
      .option("hoodie.metadata.index.column.stats.file.group.count", "4")
      .option("hoodie.metadata.index.column.stats.column.list","date_udate,date_publishedAt")
      .option("hoodie.metadata.compact.max.delta.commits", "10")

      //metadata
      .option("hoodie.metadata.record.index.enable", "true")
      .option("hoodie.index.type", "RECORD_INDEX")
      .option("hoodie.metadata.max.init.parallelism", "100000")
      .option("hoodie.metadata.record.index.min.filegroup.count", "10")
      .option("hoodie.metadata.record.index.max.filegroup.count", "10000")
      .option("hoodie.metadata.record.index.max.filegroup.size", "1073741824")
      .option("hoodie.metadata.auto.initialize", "true")
      .option("hoodie.metadata.record.index.growth.factor", "2.0")
      .option("hoodie.metadata.max.logfile.size", "2147483648")
      .option("hoodie.metadata.log.compaction.enable", "false")
      .option("hoodie.metadata.log.compaction.blocks.threshold", "5")
      .option("hoodie.metadata.max.deltacommits.when_pending", "1000")

      //file size
      .option("hoodie.parquet.field_id.write.enabled", "true")
      .option("hoodie.copyonwrite.insert.auto.split", "true")
      .option("hoodie.record.size.estimation.threshold", "1.0")
      .option("hoodie.parquet.block.size", "536870912")
      .option("hoodie.parquet.max.file.size", "536870912")
      .option("hoodie.parquet.small.file.limit", "314572800")
      .option("hoodie.logfile.max.size", "536870912")
      .option("hoodie.logfile.data.block.max.size", "536870912")
      .option("hoodie.logfile.to.parquet.compression.ratio", "0.35")

      //archive
      .option("hoodie.keep.max.commits", "30")
      .option("hoodie.keep.min.commits", "20")
      .option("hoodie.commits.archival.batch", "10")
      .option("hoodie.archive.automatic", "true") 
      .option("hoodie.archive.async", "true")
      .option("hoodie.archive.beyond.savepoint", "true")
      .option("hoodie.fail.on.timeline.archiving", "true")

      //cleaner
      .option("hoodie.clean.allow.multiple", "true")
      .option("hoodie.cleaner.incremental.mode", "true")
      .option("hoodie.clean.async", "true")
      .option("hoodie.cleaner.policy.failed.writes", "LAZY")
      .option("hoodie.cleaner.delete.bootstrap.base.file", "true")
      .option("hoodie.clean.automatic", "true")
      .option("hoodie.cleaner.policy", "KEEP_LATEST_BY_HOURS")
      .option("hoodie.cleaner.hours.retained", "6")
      .option("hoodie.clean.trigger.strategy", "NUM_COMMITS")
      .option("hoodie.clean.max.commits", "10")

      //compact
      .option("hoodie.datasource.compaction.async.enable", "true")
      .option("hoodie.compact.inline", "false")
      .option("hoodie.compact.schedule.inline", "false")
      .option("hoodie.compaction.lazy.block.read", "true")
      .option("hoodie.compaction.reverse.log.read", "false")
      .option("hoodie.compaction.target.io", compact_limit)
      .option("hoodie.compaction.strategy", "org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy")
      .option("hoodie.compact.inline.trigger.strategy", "NUM_AND_TIME")
      .option("hoodie.compact.inline.max.delta.commits", "5")
      .option("hoodie.compact.inline.max.delta.seconds", "3600")
      .option("hoodie.memory.compaction.fraction", "0.6")

      //schema
      .option("hoodie.datasource.write.reconcile.schema", "true")
      .option("hoodie.avro.schema.external.transformation","true")
      .option("hoodie.avro.schema.validate", "true")

      //lock
      .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
      .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider")
      .option("hoodie.write.lock.filesystem.expire", "10")
danny0405 commented 3 months ago

You can rollback the compaction with CIL, the cleaner would finally clean these logs, because before 1.0, the log cleaning is actually appending new log blocks to the corrupt files, which does not really clean the file instantly, these files would finally clean with the specific cleaning strategies.

MrAladdin commented 3 months ago

rollback the compaction

I'm not sure which compact to roll back and how to locate it since it has been compacted multiple times already. If it's not addressed, will it be automatically cleared later? Is there any specific documentation on this issue? I'd like to quickly understand its principle further.

Why would there be corrupt files.I did not understand corrupt files.Does 'corrupt files' refer to logs that have already been compacted