apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] Archival not working for hudi & corresponding hudi metadata table #9478

Closed vedantKhandelwalDP closed 3 months ago

vedantKhandelwalDP commented 11 months ago

We were using emr version emr-6.11.0 , hudi version 0.13.0-amzn-0 , spark version = 3.3.2 , hive version = 3.1.3

We recently migrated from hudi 0.12.2 to 0.13.0. The main reason for migration was too many files in /metadata/.hoodie/ folder i.e. archival for metadata table was not triggering. Reason for that was https://github.com/apache/hudi/issues/7472 , as stated this was fixed in hudi 0.13.0. After migrating to hudi 0.13.0, we observed archival is not working for both /.hoodie folder and /.hoodie/metadata/.hoodie/. Table type is COW.logs:- for our COW table, log says, [2023-08-16 06:26:33,848] INFO No Instants to archive (org.apache.hudi.client.HoodieTimelineArchiver)for the corresponding meta data, log says [2023-08-16 06:40:14,400] INFO Not archiving as there is no compaction yet on the metadata table (org.apache.hudi.client.HoodieTimelineArchiver) [2023-08-16 06:40:14,400] INFO No Instants to archive (org.apache.hudi.client.HoodieTimelineArchiver)

Following is the complete list of hudi params[copied from logs]:- DEBUG Passed in properties: hive_sync.support_timestamp=true hoodie.archive.async=true hoodie.archive.automatic=true hoodie.archivelog.folder=archived hoodie.bulkinsert.shuffle.parallelism=200 hoodie.clean.async=true hoodie.clean.automatic=true hoodie.cleaner.commits.retained=2 hoodie.cleaner.policy.failed.writes=EAGER hoodie.clustering.async.enabled=false hoodie.clustering.inline=false hoodie.datasource.compaction.async.enable=true hoodie.datasource.hive_sync.base_file_format=PARQUET hoodie.datasource.hive_sync.create_managed_table=false hoodie.datasource.hive_sync.database= hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.jdbcurl= hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.partition_fields=dt hoodie.datasource.hive_sync.password=hive hoodie.datasource.hive_sync.schema_string_length_thresh=4000 hoodie.datasource.hive_sync.support_timestamp=true hoodie.datasource.hive_sync.sync_as_datasource=true hoodie.datasource.hive_sync.table=table_name hoodie.datasource.hive_sync.use_jdbc=true hoodie.datasource.hivesync.username=hive hoodie.datasource.meta.sync.base.path= hoodie.datasource.meta.sync.enable=true hoodie.datasource.write.commitmeta.key.prefix= hoodie.datasource.write.drop.partition.columns=false hoodie.datasource.write.hive_style_partitioning=true hoodie.datasource.write.insert.drop.duplicates=false hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=false hoodie.datasource.write.operation=upsert hoodie.datasource.write.partitionpath.field=dt hoodie.datasource.write.partitionpath.urlencode=false hoodie.datasource.write.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload hoodie.datasource.write.precombine.field=ingestedat hoodie.datasource.write.reconcile.schema=false hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.row.writer.enable=true hoodie.datasource.write.streaming.ignore.failed.batch=false hoodie.datasource.write.streaming.retry.count=3 hoodie.datasource.write.streaming.retry.interval.ms=2000 hoodie.datasource.write.table.type=COPY_ON_WRITE hoodie.fail.on.timeline.archiving=false hoodie.finalize.write.parallelism=200 hoodie.insert.shuffle.parallelism=200 hoodie.keep.max.commits=4 hoodie.keep.min.commits=3 hoodie.meta.sync.client.tool.class=org.apache.hudi.hive.HiveSyncTool hoodie.meta.sync.metadata_file_listing=true hoodie.meta_sync.spark.version=3.3.2-amzn-0 hoodie.metadata.clean.async=true hoodie.metadata.cleaner.commits.retained=4 hoodie.metadata.enable=true hoodie.metadata.keep.max.commits=7 hoodie.metadata.keep.min.commits=5 hoodie.metrics.pushgateway.host= hoodie.metrics.pushgateway.port=9091 hoodie.parquet.max.file.size=128000000 hoodie.parquet.small.file.limit=100000000 hoodie.payload.ordering.field=ingestedat hoodie.table.base.file.format=PARQUET hoodie.table.checksum=1229177767 hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator hoodie.table.metadata.partitions=files hoodie.table.name=table_name hoodie.table.partition.fields=dt hoodie.table.precombine.field=ingestedat hoodie.table.recordkey.fields=id hoodie.table.type=COPY_ON_WRITE hoodie.table.version=5 hoodie.timeline.layout.version=1 hoodie.upsert.shuffle.parallelism=200 hoodie.write.concurrency.mode=single_writer hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider hoodie.write.lock.zookeeper.base_path=/hudi hoodie.write.lock.zookeeper.port= hoodie.write.lock.zookeeper.url=

ad1happy2go commented 11 months ago

@vedantKhandelwalDP Which writer you are using? Is it a spark structured streaming writer OR datasource writer OR deltastreamer?

PankajKaushal commented 11 months ago

@ad1happy2go We are using spark structured streaming writer.

ad1happy2go commented 11 months ago

@PankajKaushal The configuration looks okay.

The metadata table doesn't looks to be archiving as no compaction is happening for the metadata table. Can you check and confirm the same in timeline.

The main table should have been archived, just wanted to check if you have enough commits to trigger archiving i.e. 4 in your case. Can you share the timeline and code what you are using to write.

PankajKaushal commented 11 months ago

@ad1happy2go Please find the timeline attached for both table and metadata table. Yes, you are right, for metadata table there is no commit with replacecommit as action. We want to understand that also, as the number of commits are huge why compaction is not triggering.

timeline.txt metadata_timeline.txt

public static void writeToTable( Dataset data, Map<String, String> hudiOptions, String path, Pipeline pipeline) {

if (log.isDebugEnabled()) {
  writeToConsole(data);
}
int attempt = 1;
while (attempt <= 3) {
  log.error(
      "writing data for "
          + pipeline.getOutputDbName()
          + "."
          + pipeline.getOutputTableName()
          + " attempt: "
          + attempt);
  try {
    data.write().format(ORG_APACHE_HUDI).options(hudiOptions).mode(SaveMode.Append).save(path);
    break;
  } catch (org.apache.hudi.exception.HoodieRemoteException hoodieRemoteException) {
    log.error(
        "hoodieRemoteException for "
            + pipeline.getOutputDbName()
            + "."
            + pipeline.getOutputTableName()
            + " attempt: "
            + attempt);
    log.error(hoodieRemoteException.getMessage());
    log.error(hoodieRemoteException.toString());
    attempt += 1;
  } catch (Exception e) {
    SlackAlert.postChatAlert(
        "Exception for "
            + pipeline.getOutputDbName()
            + "."
            + pipeline.getOutputTableName()
            + " : "
            + e.getMessage());
    log.error(
        "Exception for "
            + pipeline.getOutputDbName()
            + "."
            + pipeline.getOutputTableName()
            + " attempt: "
            + attempt);
    log.error(e.getMessage());
    log.error(e.toString());
    attempt += 1;
  }
}
if (attempt > 3) {
  log.error("terminating application tried max attempts");
  System.exit(1);
}

}

PankajKaushal commented 11 months ago

@ad1happy2go @danny0405 @codope Can you please advise us on this.

danny0405 commented 10 months ago

I can tell you the current behavior:

  1. the MDT can not compact if there is any pending table services(compaction, clustering) on the DT timeline;
  2. the DT can not archive beyond the instant of the latest MDT compaction instant
  3. the MDT can not archive beyond the DT timeline

Finally the result is, if there is any pending table sercive instant on DT timeline, both of the timelines can not archive very well.

danny0405 commented 10 months ago

cc @nsivabalan , we are trying to tackle this, but with no clear time schedule.

PankajKaushal commented 10 months ago

@danny0405 Thanks for the information. In such case where archival on DT or MDT is not working and active timeline for both tables is huge. Is there any thing, that can be done(running any table service manually etc.) to reduce the active time line and hence listing cost as well as batch duration.

danny0405 commented 10 months ago

Try to finish(or rollback) the pending table services on DT timeline first, then the MDT timeline could be archived.

PankajKaushal commented 10 months ago

@danny0405 Thanks much for the information.

ad1happy2go commented 8 months ago

@PankajKaushal Do you need any other help on this? Feel free to close this issue if all good. Thanks.

ad1happy2go commented 3 months ago

@PankajKaushal Closing this out. Please reopen or create a new one in case of any more issues.