apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.31k stars 2.41k forks source link

[SUPPORT] deltacommit for client 172.16.116.102 already exists #2448

Closed peng-xin closed 3 years ago

peng-xin commented 3 years ago

Environment Description

1.when i write data to hudi,the error

image image

Logical Plan: RepartitionByExpression [dbName#23, tblName#24], 6 +- Project [row#21.dbName AS dbName#23, row#21.tblName AS tblName#24, row#21.opr AS opr#25, row#21.datalakeLogicalDeletion AS datalakeLogicalDeletion#26, row#21.etlTime AS etlTime#27L, row#21.jsonData AS jsonData#28] +- Project [jsontostructs(StructField(dbName,StringType,true), StructField(tblName,StringType,true), StructField(opr,StringType,true), StructField(datalakeLogicalDeletion,IntegerType,true), StructField(etlTime,LongType,true), StructField(jsonData,StringType,true), cast(value#8 as string), Some(PRC)) AS row#21] +- StreamingExecutionRelation KafkaV2[Subscribe[datalake_advertise]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) Caused by: org.apache.hudi.exception.HoodieIOException: Failed to create file /user/datalake/hudi/hbase/f_mid_business_card/.hoodie/20210114202219.deltacommit at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:449) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:333) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:449) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:333) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:308) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:143) at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:124) at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:124) at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:99) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:397) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:205) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:205) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:125) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)

2.my hoodieWriteConfig :

hoodie.filesystem.view.incr.timeline.sync.enable -> false, hoodie.bulkinsert.sort.mode -> GLOBAL_SORT, hoodie.avro.schema.externalTransformation -> false, hoodie.bootstrap.parallelism -> 1500, hoodie.delete.shuffle.parallelism -> 1500, hoodie.simple.index.use.caching -> true, hoodie.bloom.index.filter.type -> DYNAMIC_V0, hoodie.filesystem.view.remote.port -> 26754, hoodie.datasource.write.operation -> upsert, hoodie.cleaner.parallelism -> 200, hoodie.global.simple.index.parallelism -> 100, hoodie.bootstrap.mode.selector.regex -> .*, hoodie.parquet.page.size -> 1048576, hoodie.datasource.write.table.type -> MERGE_ON_READ, hoodie.datasource.hive_sync.table -> f_mid_business_card, hoodie.compaction.daybased.target.partitions -> 10, hoodie.metrics.reporter.class -> , hoodie.parquet.block.size -> 125829120, hoodie.cleaner.delete.bootstrap.base.file -> false, hoodie.consistency.check.max_interval_ms -> 300000, hoodie.insert.shuffle.parallelism -> 100, hoodie.upsert.shuffle.parallelism -> 100, hoodie.bulkinsert.shuffle.parallelism -> 1500, hoodie.write.commit.callback.on -> false, hoodie.cleaner.fileversions.retained -> 3, hoodie.datasource.hive_sync.partition_extractor_class -> org.apache.hudi.hive.NonPartitionedExtractor, hoodie.parquet.compression.codec -> gzip, hoodie.datasource.write.hive_style_partitioning -> true, hoodie.copyonwrite.insert.split.size -> 500000, hoodie.optimistic.consistency.guard.sleep_time_ms -> 500, hoodie.datasource.hive_sync.use_jdbc -> true, hoodie.metrics.reporter.type -> GRAPHITE, hoodie.bootstrap.index.class -> org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex, hoodie.filesystem.remote.backup.view.enable -> true, hoodie.logfile.to.parquet.compression.ratio -> 0.35, hoodie.filesystem.view.spillable.mem -> 104857600, hoodie.write.status.storage.level -> MEMORY_AND_DISK_SER, hoodie.write.commit.callback.http.timeout.seconds -> 3, hoodie.copyonwrite.insert.auto.split -> true, hoodie.logfile.data.block.max.size -> 268435456, hoodie.index.type -> BLOOM, hoodie.keep.min.commits -> 6, hoodie.memory.spillable.map.path -> /tmp/, hoodie.filesystem.view.rocksdb.base.path -> /tmp/hoodie_timeline_rocksdb, hoodie.compact.inline -> false, hoodie.clean.async -> true, hoodie.record.size.estimation.threshold -> 1.0, hoodie.metrics.graphite.host -> localhost, hoodie.simple.index.update.partition.path -> false, hoodie.bloom.index.filter.dynamic.max.entries -> 100000, hoodie.compaction.reverse.log.read -> false, hoodie.metrics.jmx.port -> 9889, hoodie.writestatus.class -> org.apache.hudi.client.WriteStatus, hoodie.datasource.hive_sync.enable -> true, hoodie.finalize.write.parallelism -> 1500, hoodie.rollback.parallelism -> 100, hoodie.index.bloom.num_entries -> 60000, hoodie.memory.merge.max.size -> 131072, hoodie.bootstrap.mode.selector.regex.mode -> METADATA_ONLY, hoodie.rollback.using.markers -> false, hoodie.copyonwrite.record.size.estimate -> 1024, hoodie.bloom.index.input.storage.level -> MEMORY_AND_DISKSER, hoodie.simple.index.parallelism -> 50, hoodie.consistency.check.enabled -> false, hoodie.bloom.index.use.caching -> true, hoodie.metrics.on -> false, hoodie.memory.compaction.max.size -> 1048576, hoodie.parquet.small.file.limit -> 104857600, hoodie.combine.before.insert -> false, hoodie.cleaner.commits.retained -> 2, hoodie.embed.timeline.server -> true, hoodie.bootstrap.mode.selector -> org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector, hoodie.filesystem.view.secondary.type -> MEMORY, .hoodie.allow.multi.write.on.same.instant -> false, hoodie.datasource.write.partitionpath.field -> , _hoodie.optimistic.consistency.guard.enable -> true, hoodie.datasource.hive_sync.database -> hbase, hoodie.bloom.index.update.partition.path -> true, hoodie.fail.on.timeline.archiving -> true, hoodie.markers.delete.parallelism -> 100, hoodie.filesystem.view.type -> MEMORY, hoodie.parquet.max.file.size -> 125829120, hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.NonpartitionedKeyGenerator, hoodie.bootstrap.partitionpath.translator.class -> org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator, hoodie.bloom.index.prune.by.ranges -> true, hoodie.base.path -> /user/datalake/hudi/hbase/f_mid_business_card, hoodie.index.class -> , hoodie.clean.automatic -> true, hoodie.filesystem.view.remote.host -> localhost, hoodie.compaction.lazy.block.read -> false, hoodie.memory.writestatus.failure.fraction -> 0.1, hoodie.metrics.graphite.port -> 4756, hoodie.cleaner.policy -> KEEP_LATEST_COMMITS, hoodie.logfile.max.size -> 1073741824, hoodie.filesystem.view.spillable.compaction.mem.fraction -> 0.01, hoodie.datasource.write.recordkey.field -> datalake_rowkey, hoodie.avro.schema.validate -> false, hoodie.simple.index.input.storage.level -> MEMORY_AND_DISK_SER, hoodie.timeline.layout.version -> 1, hoodie.consistency.check.max_checks -> 7, hoodie.consistency.check.initial_interval_ms -> 2000, hoodie.keep.max.commits -> 8, hoodie.compact.inline.max.delta.commits -> 5, hoodie.parquet.compression.ratio -> 0.1, hoodie.memory.dfs.buffer.max.size -> 16777216, hoodie.auto.commit -> true, hoodie.write.commit.callback.http.api.key -> hudi_write_commit_http_callback, hoodie.assume.date.partitioning -> false, hoodie.filesystem.view.spillable.dir -> /tmp/view_map/, hoodie.compaction.strategy -> org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy, hoodie.combine.before.upsert -> true, hoodie.bloom.index.keys.per.bucket -> 10000000, hoodie.write.commit.callback.class -> org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback, hoodie.bloom.index.parallelism -> 0, hoodie.cleaner.incremental.mode -> true, hoodie.commits.archival.batch -> 5, hoodie.datasource.hive_sync.partition_fields -> , hoodie.compaction.target.io -> 512000, hoodie.table.name -> f_mid_business_card, hoodie.bloom.index.bucketized.checking -> true, hoodie.compaction.payload.class -> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, hoodie.combine.before.delete -> true, hoodie.datasource.write.precombine.field -> ts, hoodie.filesystem.view.spillable.bootstrap.base.file.mem.fraction -> 0.05, hoodie.metrics.jmx.host -> localhost, hoodie.index.bloom.fpp -> 0.000000001, hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://172.16.116.102:10000, hoodie.bloom.index.use.treebased.filter -> true

To Reproduce Key code is

val hudiDF = buildHudiDF(tableConfig, partitionDF, dbNameSource, tableNameSource) val hoodieWriteConfigMap = buildHudiWriteConfig(tableConfig) hudiDF.write.format("hudi") .options(hoodieWriteConfigMap) .mode(SaveMode.Append) .save(hoodieWriteConfigMap.getOrElse(BASE_PATH_PROP, "/tmp"))

private def buildHudiDF(tableConfig: TableConfig, sourceDataFrame: DataFrame, dbNameKafka: String, tblNameKafka: String): DataFrame = { val dataLakeColumns = tableConfig.getDataLakeColumns.asScala.toArray val commonSchema = new StructType(dataLakeColumns.map(column => StructField(column.getColumnName, StringType))) val castArray = dataLakeColumns.map(column => col(column.getColumnName).cast(column.getColumnType).toString()) var targetDataFrame = sourceDataFrame .select(from_json(col("jsonData"), commonSchema).alias("row"), col("datalakeLogicalDeletion")) .select(col("row."), col("datalakeLogicalDeletion")) .selectExpr(castArray :+ (col("datalakeLogicalDeletion").alias("datalake_logicaldeletion").toString()): ) .filter(col("dbName").isNull.or(col("dbName").equalTo(dbNameKafka)).and(col("tblName").equalTo(tblNameKafka))) val precombineField = tableConfig.getPrecombineFieldOptKey if (StringUtils.isEmpty(precombineField) || !commonSchema.contains(StructField(precombineField, StringType))) { targetDataFrame = targetDataFrame .withColumn("ts", current_timestamp()) } val complexRecordKey = tableConfig.getRecordkeyFieldOptKey if (StringUtils.isNotEmpty(complexRecordKey)) { val keyArray = complexRecordKey.split(StringUtils.COMMA_SYMBOL) var conditionChain = col(keyArray(0)).isNotNull.and(col(keyArray(0)).notEqual(StringUtils.EMPTY)) keyArray.tail.foreach(key => conditionChain.or(col(key).isNotNull.and(col(key).notEqual(StringUtils.EMPTY)))) targetDataFrame = targetDataFrame.filter(conditionChain) } targetDataFrame }

private def buildHudiWriteConfig(tableConfig: TableConfig): mutable.Map[String, String] = { val options = new mutable.HashMap[String, String] val dbName = tableConfig.getDbName val tblName = tableConfig.getTableName val partition_field = tableConfig.getPartitionpathFieldOptKey options += (TABLE_TYPE_OPT_KEY -> tableConfig.getTableTypeOptKey) options += (OPERATION_OPT_KEY -> tableConfig.getOperationOptKey) options += (RECORDKEY_FIELD_OPT_KEY -> tableConfig.getRecordkeyFieldOptKey) options += (PRECOMBINE_FIELD_OPT_KEY -> tableConfig.getPrecombineFieldOptKey) options += (HIVE_SYNC_ENABLED_OPT_KEY -> "true") options += (INDEX_TYPE_PROP -> tableConfig.getIndexTypeProp) options += (BLOOM_INDEX_FILTER_TYPE -> BloomFilterTypeCode.DYNAMIC_V0.name()) options += (KEYGENERATOR_CLASS_OPT_KEY -> tableConfig.getKeygeneratorClassOptKey) options += (HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> tableConfig.getHivePartitionExtractorClassOptKey) options += (HIVE_STYLE_PARTITIONING_OPT_KEY -> "true") options += (PARTITIONPATH_FIELD_OPT_KEY -> partition_field) options += (HIVE_PARTITION_FIELDS_OPT_KEY -> partition_field.split(",").map(pair => pair.split(":")(0)).mkString(",")) options += (HIVE_DATABASE_OPT_KEY -> dbName) options += (HIVE_TABLE_OPT_KEY -> tblName) options += (HIVE_USE_JDBC_OPT_KEY -> tableConfig.getHiveUseJdbcOptKey) options += (HIVE_URL_OPT_KEY -> tableConfig.getHiveUrlOptKey) val hoodieCompactionConfig = HoodieCompactionConfig.newBuilder() .retainCommits(2) .withCommitsArchivalBatchSize(5) .archiveCommitsWith(6, 8) .withAsyncClean(true) .build() val hoodieIndexConfig = HoodieIndexConfig.newBuilder() .withBloomIndexUpdatePartitionPath(true) .build() val hoodieMemoryConfig = HoodieMemoryConfig.newBuilder() .withMaxMemoryMaxSize(128 1024, 1024 1024) .build() val hoodieWriteConfig = HoodieWriteConfig.newBuilder() .withParallelism(tableConfig.getShuffleParallelism.toInt, tableConfig.getShuffleParallelism.toInt) .withIndexConfig(hoodieIndexConfig) .withCompactionConfig(hoodieCompactionConfig) .withMemoryConfig(hoodieMemoryConfig) .withProps(options.asJava) .forTable(tblName) .withPath(StringUtils.concat(tableConfig.getBasePath, File.separator, tableConfig.getDbName, File.separator, tableConfig.getTableName)) .build() logger.info(s"hoodieWriteConfig -> {${hoodieWriteConfig.getProps.asScala.toString()}}") hoodieWriteConfig.getProps.asScala }

but ,when the hoodieWriteConfig like below,it is ok.but the log file is too big(2GB+),and small data file(1MB-) too much. and cause OOM error each hour.

hoodie.datasource.hive_sync.table->f_mid_order_details, hoodie.bloom.index.update.partition.path->true, hoodie.bloom.index.filter.type->DYNAMIC_V0, hoodie.datasource.write.keygenerator.class->org.apache.hudi.keygen.SimpleKeyGenerator, hoodie.datasource.hive_sync.database->hudi, hoodie.datasource.write.table.type->MERGE_ON_READ, hoodie.datasource.write.partitionpath.field->payment_date, hoodie.datasource.hive_sync.partition_fields->payment_date, hoodie.datasource.hive_sync.partition_extractor_class->org.apache.hudi.hive.MultiPartKeysValueExtractor, hoodie.datasource.write.recordkey.field->key, hoodie.datasource.hive_sync.enable->true, hoodie.upsert.shuffle.parallelism->100, hoodie.index.type->GLOBAL_BLOOM, hoodie.datasource.hive_sync.jdbcurl->jdbc:hive2://172.16.117.73:10000, hoodie.compact.inline->true, hoodie.datasource.write.precombine.field->ts, hoodie.table.name->f_mid_order_details, hoodie.datasource.write.hive_style_partitioning->true, hoodie.datasource.write.operation->upsert

bvaradar commented 3 years ago

Can you provide the full dump of the logs and .hoodie/ folder ?

peng-xin commented 3 years ago

Can you provide the full dump of the logs and .hoodie/ folder ?

the log has been clear, but still have some picture image image image image

when i set hoodie.auto.commit = false,the error is gone. but how to limit the log file size,my log file is so big(3GB+),log file version always 1. image

when i change hoodie.cleaner.policy = KEEP_LATEST_FILE_VERSIONS and hoodie.cleaner.fileversions.retained = 1,the old data file can be clean,but how to clean the old log file(or clean the old log file commit,set hoodie.cleaner.policy = KEEP_LATEST_COMMITS and hoodie.cleaner.commits.retained = 1 is useless) now my config is

hoodie.filesystem.view.incr.timeline.sync.enable -> false, hoodie.bulkinsert.sort.mode -> GLOBAL_SORT, hoodie.avro.schema.externalTransformation -> false, hoodie.bootstrap.parallelism -> 1500, hoodie.delete.shuffle.parallelism -> 1500, hoodie.simple.index.use.caching -> true, hoodie.bloom.index.filter.type -> DYNAMIC_V0, hoodie.filesystem.view.remote.port -> 26754, hoodie.datasource.write.operation -> upsert, hoodie.cleaner.parallelism -> 200, hoodie.global.simple.index.parallelism -> 100, hoodie.bootstrap.mode.selector.regex -> .*, hoodie.parquet.page.size -> 1048576, hoodie.datasource.write.table.type -> MERGE_ON_READ, hoodie.datasource.hive_sync.table -> f_mid_business_card, hoodie.compaction.daybased.target.partitions -> 10, hoodie.metrics.reporter.class -> , hoodie.parquet.block.size -> 125829120, hoodie.cleaner.delete.bootstrap.base.file -> false, hoodie.consistency.check.max_interval_ms -> 300000, hoodie.insert.shuffle.parallelism -> 100, hoodie.upsert.shuffle.parallelism -> 100, hoodie.bulkinsert.shuffle.parallelism -> 1500, hoodie.write.commit.callback.on -> false, hoodie.cleaner.fileversions.retained -> 1, hoodie.datasource.hive_sync.partition_extractor_class -> org.apache.hudi.hive.NonPartitionedExtractor, hoodie.parquet.compression.codec -> gzip, hoodie.datasource.write.hive_style_partitioning -> true, hoodie.copyonwrite.insert.split.size -> 500000, hoodie.optimistic.consistency.guard.sleep_time_ms -> 500, hoodie.datasource.hive_sync.use_jdbc -> true, hoodie.metrics.reporter.type -> GRAPHITE, hoodie.bootstrap.index.class -> org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex, hoodie.filesystem.remote.backup.view.enable -> true, hoodie.logfile.to.parquet.compression.ratio -> 0.35, hoodie.filesystem.view.spillable.mem -> 104857600, hoodie.write.status.storage.level -> MEMORY_AND_DISK_SER, hoodie.write.commit.callback.http.timeout.seconds -> 3, hoodie.copyonwrite.insert.auto.split -> true, hoodie.logfile.data.block.max.size -> 268435456, hoodie.index.type -> BLOOM, hoodie.keep.min.commits -> 6, hoodie.memory.spillable.map.path -> /tmp/, hoodie.filesystem.view.rocksdb.base.path -> /tmp/hoodie_timeline_rocksdb, hoodie.compact.inline -> false, hoodie.clean.async -> true, hoodie.record.size.estimation.threshold -> 1.0, hoodie.metrics.graphite.host -> localhost, hoodie.simple.index.update.partition.path -> false, hoodie.bloom.index.filter.dynamic.max.entries -> 100000, hoodie.compaction.reverse.log.read -> false, hoodie.metrics.jmx.port -> 9889, hoodie.writestatus.class -> org.apache.hudi.client.WriteStatus, hoodie.datasource.hive_sync.enable -> true, hoodie.finalize.write.parallelism -> 1500, hoodie.rollback.parallelism -> 100, hoodie.index.bloom.num_entries -> 60000, hoodie.memory.merge.max.size -> 134217728, hoodie.bootstrap.mode.selector.regex.mode -> METADATA_ONLY, hoodie.rollback.using.markers -> false, hoodie.copyonwrite.record.size.estimate -> 1024, hoodie.bloom.index.input.storage.level -> MEMORY_AND_DISKSER, hoodie.simple.index.parallelism -> 50, hoodie.consistency.check.enabled -> false, hoodie.bloom.index.use.caching -> true, hoodie.metrics.on -> false, hoodie.memory.compaction.max.size -> 1073741824, hoodie.parquet.small.file.limit -> 104857600, hoodie.combine.before.insert -> false, hoodie.cleaner.commits.retained -> 1, hoodie.embed.timeline.server -> true, hoodie.bootstrap.mode.selector -> org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector, hoodie.filesystem.view.secondary.type -> MEMORY, .hoodie.allow.multi.write.on.same.instant -> false, hoodie.datasource.write.partitionpath.field -> , _hoodie.optimistic.consistency.guard.enable -> true, hoodie.datasource.hive_sync.database -> hbase, hoodie.bloom.index.update.partition.path -> true, hoodie.fail.on.timeline.archiving -> true, hoodie.markers.delete.parallelism -> 100, hoodie.filesystem.view.type -> MEMORY, hoodie.parquet.max.file.size -> 125829120, hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.NonpartitionedKeyGenerator, hoodie.bootstrap.partitionpath.translator.class -> org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator, hoodie.bloom.index.prune.by.ranges -> true, hoodie.base.path -> /user/datalake/hudi/hbase/f_mid_business_card, hoodie.index.class -> , hoodie.clean.automatic -> true, hoodie.filesystem.view.remote.host -> localhost, hoodie.compaction.lazy.block.read -> false, hoodie.memory.writestatus.failure.fraction -> 0.1, hoodie.metrics.graphite.port -> 4756, hoodie.cleaner.policy -> KEEP_LATEST_FILE_VERSIONS, hoodie.logfile.max.size -> 1073741824, hoodie.filesystem.view.spillable.compaction.mem.fraction -> 0.01, hoodie.datasource.write.recordkey.field -> datalake_rowkey, hoodie.avro.schema.validate -> false, hoodie.simple.index.input.storage.level -> MEMORY_AND_DISK_SER, hoodie.timeline.layout.version -> 1, hoodie.consistency.check.max_checks -> 7, hoodie.consistency.check.initial_interval_ms -> 2000, hoodie.keep.max.commits -> 8, hoodie.compact.inline.max.delta.commits -> 5, hoodie.parquet.compression.ratio -> 0.1, hoodie.memory.dfs.buffer.max.size -> 16777216, hoodie.auto.commit -> false, hoodie.write.commit.callback.http.api.key -> hudi_write_commit_http_callback, hoodie.assume.date.partitioning -> false, hoodie.filesystem.view.spillable.dir -> /tmp/view_map/, hoodie.compaction.strategy -> org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy, hoodie.combine.before.upsert -> true, hoodie.bloom.index.keys.per.bucket -> 10000000, hoodie.write.commit.callback.class -> org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback, hoodie.bloom.index.parallelism -> 0, hoodie.cleaner.incremental.mode -> true, hoodie.commits.archival.batch -> 5, hoodie.datasource.hive_sync.partition_fields -> , hoodie.compaction.target.io -> 512000, hoodie.table.name -> f_mid_business_card, hoodie.bloom.index.bucketized.checking -> true, hoodie.compaction.payload.class -> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, hoodie.combine.before.delete -> true, hoodie.datasource.write.precombine.field -> ts, hoodie.filesystem.view.spillable.bootstrap.base.file.mem.fraction -> 0.05, hoodie.metrics.jmx.host -> localhost, hoodie.index.bloom.fpp -> 0.000000001, hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://172.16.116.102:10000, hoodie.bloom.index.use.treebased.filter -> true

bvaradar commented 3 years ago

@peng-xin : Can you attach the contents of hoodie.properties file here. This is most likely due to setting timeline layout version wrongly.

Can you start from a clean slate (new base path) and not pass "hoodie.timeline.layout.version" in configs and try and let us know ?

peng-xin commented 3 years ago

@peng-xin : Can you attach the contents of hoodie.properties file here. This is most likely due to setting timeline layout version wrongly.

Can you start from a clean slate (new base path) and not pass "hoodie.timeline.layout.version" in configs and try and let us know ?

the old hoodie.properties is there: image

i drop the old data and removed the config hoodie.timeline.layout.version,but that happened again. image image

now the config is there:

hoodie.filesystem.view.incr.timeline.sync.enable -> false, hoodie.bulkinsert.sort.mode -> GLOBAL_SORT, hoodie.bootstrap.parallelism -> 1500, hoodie.avro.schema.externalTransformation -> false, hoodie.delete.shuffle.parallelism -> 1500, hoodie.simple.index.use.caching -> true, hoodie.bloom.index.filter.type -> DYNAMIC_V0, hoodie.filesystem.view.remote.port -> 26754, hoodie.datasource.write.operation -> upsert, hoodie.cleaner.parallelism -> 200, hoodie.global.simple.index.parallelism -> 100, hoodie.bootstrap.mode.selector.regex -> .*, hoodie.parquet.page.size -> 1048576, hoodie.datasource.write.table.type -> MERGE_ON_READ, hoodie.datasource.hive_sync.table -> f_mid_business_card, hoodie.compaction.daybased.target.partitions -> 10, hoodie.metrics.reporter.class -> , hoodie.parquet.block.size -> 125829120, hoodie.cleaner.delete.bootstrap.base.file -> false, hoodie.consistency.check.max_interval_ms -> 300000, hoodie.insert.shuffle.parallelism -> 100, hoodie.upsert.shuffle.parallelism -> 100, hoodie.bulkinsert.shuffle.parallelism -> 1500, hoodie.write.commit.callback.on -> false, hoodie.cleaner.fileversions.retained -> 1, hoodie.datasource.hive_sync.partition_extractor_class -> org.apache.hudi.hive.NonPartitionedExtractor, hoodie.parquet.compression.codec -> gzip, hoodie.datasource.write.hive_style_partitioning -> true, hoodie.copyonwrite.insert.split.size -> 500000, hoodie.optimistic.consistency.guard.sleep_time_ms -> 500, hoodie.datasource.hive_sync.use_jdbc -> true, hoodie.metrics.reporter.type -> GRAPHITE, hoodie.bootstrap.index.class -> org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex, hoodie.logfile.to.parquet.compression.ratio -> 0.35, hoodie.filesystem.remote.backup.view.enable -> true, hoodie.filesystem.view.spillable.mem -> 104857600, hoodie.write.status.storage.level -> MEMORY_AND_DISK_SER, hoodie.write.commit.callback.http.timeout.seconds -> 3, hoodie.copyonwrite.insert.auto.split -> true, hoodie.logfile.data.block.max.size -> 268435456, hoodie.index.type -> BLOOM, hoodie.keep.min.commits -> 6, hoodie.memory.spillable.map.path -> /tmp/, hoodie.filesystem.view.rocksdb.base.path -> /tmp/hoodie_timeline_rocksdb, hoodie.compact.inline -> false, hoodie.clean.async -> true, hoodie.record.size.estimation.threshold -> 1.0, hoodie.simple.index.update.partition.path -> false, hoodie.bloom.index.filter.dynamic.max.entries -> 100000, hoodie.metrics.graphite.host -> localhost, hoodie.compaction.reverse.log.read -> false, hoodie.metrics.jmx.port -> 9889, hoodie.datasource.hive_sync.enable -> true, hoodie.writestatus.class -> org.apache.hudi.client.WriteStatus, hoodie.finalize.write.parallelism -> 1500, hoodie.rollback.parallelism -> 100, hoodie.index.bloom.num_entries -> 60000, hoodie.memory.merge.max.size -> 134217728, hoodie.bootstrap.mode.selector.regex.mode -> METADATA_ONLY, hoodie.rollback.using.markers -> false, hoodie.copyonwrite.record.size.estimate -> 1024, hoodie.bloom.index.input.storage.level -> MEMORY_AND_DISKSER, hoodie.simple.index.parallelism -> 50, hoodie.consistency.check.enabled -> false, hoodie.bloom.index.use.caching -> true, hoodie.memory.compaction.max.size -> 1073741824, hoodie.metrics.on -> false, hoodie.parquet.small.file.limit -> 104857600, hoodie.combine.before.insert -> false, hoodie.cleaner.commits.retained -> 1, hoodie.embed.timeline.server -> true, hoodie.bootstrap.mode.selector -> org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector, hoodie.datasource.write.partitionpath.field -> , .hoodie.allow.multi.write.on.same.instant -> false, hoodie.filesystem.view.secondary.type -> MEMORY, _hoodie.optimistic.consistency.guard.enable -> true, hoodie.datasource.hive_sync.database -> hbase, hoodie.bloom.index.update.partition.path -> true, hoodie.fail.on.timeline.archiving -> true, hoodie.markers.delete.parallelism -> 100, hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.NonpartitionedKeyGenerator, hoodie.parquet.max.file.size -> 125829120, hoodie.filesystem.view.type -> MEMORY, hoodie.bootstrap.partitionpath.translator.class -> org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator, hoodie.bloom.index.prune.by.ranges -> true, hoodie.base.path -> /user/datalake/hudi/hbase/f_mid_business_card, hoodie.clean.automatic -> true, hoodie.index.class -> , hoodie.compaction.lazy.block.read -> false, hoodie.filesystem.view.remote.host -> localhost, hoodie.memory.writestatus.failure.fraction -> 0.1, hoodie.metrics.graphite.port -> 4756, hoodie.cleaner.policy -> KEEP_LATEST_FILE_VERSIONS, hoodie.logfile.max.size -> 1073741824, hoodie.filesystem.view.spillable.compaction.mem.fraction -> 0.01, hoodie.datasource.write.recordkey.field -> datalake_rowkey, hoodie.simple.index.input.storage.level -> MEMORY_AND_DISK_SER, hoodie.avro.schema.validate -> false, hoodie.consistency.check.max_checks -> 7, hoodie.keep.max.commits -> 8, hoodie.consistency.check.initial_interval_ms -> 2000, hoodie.compact.inline.max.delta.commits -> 5, hoodie.parquet.compression.ratio -> 0.1, hoodie.memory.dfs.buffer.max.size -> 16777216, hoodie.auto.commit -> false, hoodie.write.commit.callback.http.api.key -> hudi_write_commit_http_callback, hoodie.assume.date.partitioning -> false, hoodie.filesystem.view.spillable.dir -> /tmp/view_map/, hoodie.compaction.strategy -> org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy, hoodie.bloom.index.keys.per.bucket -> 10000000, hoodie.combine.before.upsert -> true, hoodie.cleaner.incremental.mode -> true, hoodie.bloom.index.parallelism -> 0, hoodie.write.commit.callback.class -> org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback, hoodie.commits.archival.batch -> 5, hoodie.compaction.target.io -> 512000, hoodie.datasource.hive_sync.partition_fields -> , hoodie.table.name -> f_mid_business_card, hoodie.bloom.index.bucketized.checking -> true, hoodie.compaction.payload.class -> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, hoodie.datasource.write.precombine.field -> ts, hoodie.combine.before.delete -> true, hoodie.filesystem.view.spillable.bootstrap.base.file.mem.fraction -> 0.05, hoodie.metrics.jmx.host -> localhost, hoodie.index.bloom.fpp -> 0.000000001, hoodie.bloom.index.use.treebased.filter -> true, hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://172.16.116.102:10000

bvaradar commented 3 years ago

@peng-xin : Can you enable hoodie.compact.inline -> true and hoodie.auto.commit -> true. The log files are growing because they need to be compacted and if you set the first config, it will periodically run compactions. Cleaner will eventually remove old log files and parquet files after that.

peng-xin commented 3 years ago

@peng-xin : Can you enable hoodie.compact.inline -> true and hoodie.auto.commit -> true. The log files are growing because they need to be compacted and if you set the first config, it will periodically run compactions. Cleaner will eventually remove old log files and parquet files after that.

thank you so much. when i set hoodie.compact.inline -> true,the size of log be limited.

but hoodie.auto.commit -> true will cause the same error image

n3nash commented 3 years ago

@peng-xin Are you able to proceed with hoodie.compact.inline -> true and hoodie.auto.commit -> false ?

nsivabalan commented 3 years ago

Guess you might have to fix the max file size. I see currently you set it to very high value. Was that intentional ? hoodie.parquet.max.file.size -> 125829120, hoodie.logfile.max.size -> 1073741824

Can you try setting the values as follows: parquet max file size: 120Mb

@vinothchandar : any recommendation for log file max size?

nsivabalan commented 3 years ago

@peng-xin : few quick questions as we triage the issue.

root18039532923 commented 3 years ago

if you set "hoodie.compact.inline -> true",this means compaction changes inline, but async?

nsivabalan commented 3 years ago

nope. inline means sync. if not its async.

root18039532923 commented 3 years ago

but I need async, if I set "hoodie.compact.inline -> true",that is not async.

bvaradar commented 3 years ago

@root18039532923 : Please look at https://hudi.apache.org/blog/async-compaction-deployment-model/ for running async compactions

root18039532923 commented 3 years ago

INLINE_COMPACT_NUM_DELTA_COMMITS_PROP is the configuration of sync?why does async need to add this? I know that The default value of ASYNC_COMPACT_ENABLE_OPT_KEY is true. image

nsivabalan commented 3 years ago

@root18039532923 : sorry for late reply. those configs are the same used across both sync and async compactions. May be should have renamed it. my hunch is that, inline was first introduced and configs were named for that. later down the line, async was introduced and developers does not want to introduce new configs nor change the existing config names.

Let us know if you need any more help. once you respond, can you please remove "awaiting-user-response" label for the issue. If possible add "awaiting-community-help" label.

n3nash commented 3 years ago

@peng-xin Since we haven't heard from you in a while and this issue has not been reported by anyone else, I'm assuming this to be a transient issue with some of your settings. Let me know if you need further help.

@root18039532923 Please feel free to re-open if you are still confused about how to use async compaction.