apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.43k forks source link

[SUPPORT] Disable only Async Compaction in hudi deltastreamer is not working #11745

Closed abhisheksahani91 closed 3 months ago

abhisheksahani91 commented 3 months ago

Tips before filing an issue

Describe the problem you faced

We want to disable the Async Compaction and allow only scheduling the compaction in the delta streamer process. For Compaction, we have a standalone process utilizing HoodieCompactor of hudi-utilities

what we are observing in logs is that the AsyncCompaction service is still running along with the deltastreamer/ingestion process and it is competing with the standalone compaction process for the timeline compaction instant that has already gone to the compaction completed stage

To Reproduce

Steps to reproduce the behavior:

  1. in deltastreamer add the following config to disable compaction and allow scheduling compaction instant a. hoodie.compact.inline=false b. hoodie.compact.schedule.inline=true
  2. use HoodieCompactor util for offline compaction

Expected behavior deltastreamer/ingestion process should only schedule the compaction A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

We are observing that Async Compaction is still running along with the DeltaStreamer and it is failing as the Offline Compaction process already completes the compaction

Stacktrace

Add the stack trace of the error. LOGS that shows AsyncCompaction is still running and it failed as compaction instant is already completed by the offline compactor util.

24/08/08 16:42:46 ERROR AsyncCompactService: Compactor executor failed org.apache.hudi.exception.HoodieIOException: Failed to create files3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/20240808160446342.commit at org.apache.hudi.common.fs.HoodieWrapperFileSystem.createImmutableFileInPath(HoodieWrapperFileSystem.java:1035) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:582) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:558) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionCompactionInflightToComplete(HoodieActiveTimeline.java:435) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.table.action.compact.CompactHelpers.completeInflightCompaction(CompactHelpers.java:78) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.client.SparkRDDWriteClient.completeCompaction(SparkRDDWriteClient.java:316) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.client.SparkRDDWriteClient.commitCompaction(SparkRDDWriteClient.java:300) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.client.HoodieSparkCompactor.compact(HoodieSparkCompactor.java:62) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.async.AsyncCompactService.lambda$null$0(AsyncCompactService.java:84) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_412] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_412] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_412] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412] Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/20240808160446342.commit

LOGS that show Async Compaction Service is executing along with the delta streamer process

24/08/08 16:04:50 INFO BaseHoodieWriteClient: Scheduling compaction at instant time :20240808160446342 24/08/08 16:04:50 INFO HoodieCompactor: Compacting s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3 with commit 20240808160446342 24/08/08 16:24:01 INFO AsyncCompactService: Starting Compaction for instant [==>20240808160446342compactionREQUESTED] 24/08/08 16:24:02 INFO MergeOnReadRollbackActionExecutor: Rolling back instant [==>20240808160446342compactionINFLIGHT] 24/08/08 16:24:02 INFO MergeOnReadRollbackActionExecutor: Unpublished [==>20240808160446342compactionINFLIGHT] 24/08/08 16:24:02 INFO BaseRollbackActionExecutor: Rolled back inflight instant 20240808160446342 24/08/08 16:24:04 WARN BaseRollbackActionExecutor: Rollback finished without deleting inflight instant file. Instant=[==>20240808160446342compactionINFLIGHT] 24/08/08 16:24:05 INFO BaseRollbackActionExecutor: Rollback of Commits [20240808160446342] is complete 24/08/08 16:24:05 INFO HoodieActiveTimeline: Deleting instant [==>20240808160446342compactionINFLIGHT] 24/08/08 16:24:05 INFO HoodieActiveTimeline: Removed instant [==>20240808160446342compactionINFLIGHT]

24/08/08 16:42:46 ERROR AsyncCompactService: Compactor executor failed org.apache.hudi.exception.HoodieIOException: Failed to create files3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/20240808160446342.commit at org.apache.hudi.common.fs.HoodieWrapperFileSystem.createImmutableFileInPath(HoodieWrapperFileSystem.java:1035) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:582) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:558) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionCompactionInflightToComplete(HoodieActiveTimeline.java:435) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.table.action.compact.CompactHelpers.completeInflightCompaction(CompactHelpers.java:78) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.client.SparkRDDWriteClient.completeCompaction(SparkRDDWriteClient.java:316) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.client.SparkRDDWriteClient.commitCompaction(SparkRDDWriteClient.java:300) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.client.HoodieSparkCompactor.compact(HoodieSparkCompactor.java:62) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.async.AsyncCompactService.lambda$null$0(AsyncCompactService.java:84) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_412] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_412] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_412] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412] Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/20240808160446342.commit

Logs for standalone compaction process

24/08/08 16:11:01 INFO HoodieCompactor: Found the earliest scheduled compaction instant which will be executed: 20240808160446342 24/08/08 16:11:01 INFO S3NativeFileSystem: Opening 's3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/.aux/20240808160446342.compaction.requested' for reading 24/08/08 16:11:02 INFO HoodieActiveTimeline: Checking for file exists ?s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/20240808160446342.compaction.requested 24/08/08 16:11:02 INFO MultipartUploadOutputStream: close closed:false s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/20240808160446342.compaction.inflight 24/08/08 16:11:02 INFO HoodieActiveTimeline: Create new file for toInstant ?s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/20240808160446342.compaction.inflight 24/08/08 16:12:26 INFO MarkerHandler: Request: create marker s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/.temp/20240808160446342 b8a93e67-d11e-4f7f-96c7-de471acc8e23-0_0-0-0_20240808160446342.parquet.marker.MERGE 24/08/08 16:12:26 INFO MultipartUploadOutputStream: close closed:false s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/.temp/20240808160446342/MARKERS.type 24/08/08 16:12:26 INFO MultipartUploadOutputStream: close closed:true s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/.temp/20240808160446342/MARKERS.type 24/08/08 16:12:26 INFO MultipartUploadOutputStream: close closed:false s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/.temp/20240808160446342/MARKERS0 24/08/08 16:12:26 INFO MultipartUploadOutputStream: close closed:true s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/.temp/20240808160446342/MARKERS0 24/08/08 16:30:41 INFO TimelineServerBasedWriteMarkers: Sending request : (http://ip:45239/v1/hoodie/marker/dir/exists?markerdirpath=s3%3A%2F%2Fg24x7-eds-datalake%2Fhudi_data_lake_prod10%2Fprod_eds_datalake_ups_latest_v3%2F.hoodie%2F.temp%2F20240808160446342) 24/08/08 16:30:41 INFO TimelineServerBasedWriteMarkers: Sending request : (http://ip:45239/v1/hoodie/marker/create-and-merge?markerdirpath=s3%3A%2F%2Fg24x7-eds-datalake%2Fhudi_data_lake_prod10%2Fprod_eds_datalake_ups_latest_v3%2F.hoodie%2F.temp%2F20240808160446342) 24/08/08 16:30:42 INFO HoodieTableMetadataUtil: Updating at 20240808160446342 from Commit/COMPACT. #partitions_updated=2 24/08/08 16:30:43 INFO BaseHoodieWriteClient: Generate a new instant time: 20240808160446342 action: deltacommit 24/08/08 16:30:43 INFO HoodieHeartbeatClient: Received request to start heartbeat for instant time 20240808160446342 24/08/08 16:30:43 INFO MultipartUploadOutputStream: close closed:false s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/.heartbeat/20240808160446342 24/08/08 16:30:43 INFO HoodieActiveTimeline: Creating a new instant [==>20240808160446342deltacommitREQUESTED] 24/08/08 16:30:43 INFO MultipartUploadOutputStream: close closed:false s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/20240808160446342.deltacommit.requested 24/08/08 16:30:47 INFO HoodieActiveTimeline: Checking for file exists ?s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/20240808160446342.deltacommit.requested 24/08/08 16:30:47 INFO MultipartUploadOutputStream: close closed:false s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/20240808160446342.deltacommit.inflight 24/08/08 16:30:47 INFO FileIOUtils: Created a new file in meta path: s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/20240808160446342.deltacommit.inflight 24/08/08 16:30:47 INFO MultipartUploadOutputStream: close closed:false s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/20240808160446342.deltacommit.inflight 24/08/08 16:30:47 INFO HoodieActiveTimeline: Create new file for toInstant ?s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/20240808160446342.deltacommit.inflight 24/08/08 16:30:47 INFO BaseCommitActionExecutor: Auto commit enabled: Committing 20240808160446342 24/08/08 16:30:50 INFO BaseSparkCommitActionExecutor: Committing 20240808160446342, action Type deltacommit, operation Type UPSERT_PREPPED 24/08/08 16:30:52 INFO HoodieActiveTimeline: Marking instant complete [==>20240808160446342deltacommitINFLIGHT] 24/08/08 16:30:52 INFO HoodieActiveTimeline: Checking for file exists ?s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/20240808160446342.deltacommit.inflight 24/08/08 16:30:52 INFO MultipartUploadOutputStream: close closed:false s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/20240808160446342.deltacommit 24/08/08 16:30:52 INFO HoodieActiveTimeline: Create new file for toInstant ?s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/20240808160446342.deltacommit 24/08/08 16:30:52 INFO HoodieActiveTimeline: Completed [==>20240808160446342deltacommitINFLIGHT] 24/08/08 16:30:52 INFO BaseSparkCommitActionExecutor: Committed 20240808160446342 24/08/08 16:30:52 INFO FSUtils: Removed directory at s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/metadata/.hoodie/.temp/20240808160446342 24/08/08 16:30:52 INFO HoodieHeartbeatClient: Stopping heartbeat for instant 20240808160446342 24/08/08 16:30:52 INFO HoodieHeartbeatClient: Stopped heartbeat for instant 20240808160446342 24/08/08 16:30:52 INFO HeartbeatUtils: Deleted the heartbeat for instant 20240808160446342 24/08/08 16:30:52 INFO HoodieHeartbeatClient: Deleted heartbeat file for instant 20240808160446342 24/08/08 16:30:52 INFO HoodieActiveTimeline: Checking for file exists ?s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/20240808160446342.compaction.inflight 24/08/08 16:30:52 INFO MultipartUploadOutputStream: close closed:false s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/20240808160446342.commit 24/08/08 16:30:52 INFO HoodieActiveTimeline: Create new file for toInstant ?s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/20240808160446342.commit 24/08/08 16:30:52 INFO TimelineServerBasedWriteMarkers: Sending request : (http://ip:45239/v1/hoodie/marker/dir/delete?markerdirpath=s3%3A%2F%2Fg24x7-eds-datalake%2Fhudi_data_lake_prod10%2Fprod_eds_datalake_ups_latest_v3%2F.hoodie%2F.temp%2F20240808160446342) 24/08/08 16:30:53 INFO FSUtils: Removed directory at s3://g24x7-eds-datalake/hudi_data_lake_prod10/prod_eds_datalake_ups_latest_v3/.hoodie/.temp/20240808160446342 24/08/08 16:30:53 INFO SparkRDDWriteClient: Compacted successfully on commit 20240808160446342 24/08/08 16:30:53 INFO UtilHelpers: Finish job with 20240808160446342 instant time.

danny0405 commented 3 months ago

Did you try to disable this option hoodie.datasource.write.streaming.disable.compaction ?

abhisheksahani91 commented 3 months ago

@danny0405 Thanks for answering

hoodie.datasource.write.streaming.disable.compaction is introduced in 0.14.0 , we are using hudi 0.12.1

For now, i have added following two configs set to false, I will test it and update. hoodie.kafka.compaction.async.enable hoodie.datasource.compaction.async.enable

abhisheksahani91 commented 3 months ago

post adding above mentioned configs, There is still logs printed on Driver regarding Async Compaction

LOG 24/08/09 06:01:56 INFO AsyncCompactService: Starting Compaction for instant [==>20240808163855558compactionREQUESTED]

Source code that print above log

@Override
  protected Pair<CompletableFuture, ExecutorService> startService() {
    ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction,
        new CustomizedThreadFactory("async_compact_thread", isRunInDaemonMode()));
    return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
      try {
        // Set Compactor Pool Name for allowing users to prioritize compaction
        LOG.info("Setting pool name for compaction to " + COMPACT_POOL_NAME);
        context.setProperty(EngineProperty.COMPACTION_POOL_NAME, COMPACT_POOL_NAME);

        while (!isShutdownRequested()) {
          final HoodieInstant instant = fetchNextAsyncServiceInstant();

          if (null != instant) {
            LOG.info("Starting Compaction for instant " + instant);
            compactor.compact(instant);
            LOG.info("Finished Compaction for instant " + instant);
          }
        }
        LOG.info("Compactor shutting down properly!!");
      } catch (InterruptedException ie) {
        hasError = true;
        LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
      } catch (IOException e) {
        hasError = true;
        LOG.error("Compactor executor failed due to IOException", e);
        throw new HoodieIOException(e.getMessage(), e);
      } catch (Exception e) {
        hasError = true;
        LOG.error("Compactor executor failed", e);
        throw e;
      }
      return true;
    }, executor)).toArray(CompletableFuture[]::new)), executor);
  }
abhisheksahani91 commented 3 months ago

@danny0405

`/**

abhisheksahani91 commented 3 months ago

@danny0405 any update ? I tried --disable-compaction, but it also disables scheduling the compaction.

I am required to schedule the compaction from the delta streamer job and execute the compaction in a different process only running compaction execution logic.

Let me summarize it for you : I am using following config a. hoodie.compact.inline=false b. hoodie.compact.schedule.inline=true c. hoodie.kafka.compaction.async.enable=false d. hoodie.datasource.compaction.async.enable=false

With the Above config, I was still able to see the logs regarding the compaction execution in the deltastreamer job 24/08/08 16:11:01 INFO HoodieCompactor: Found the earliest scheduled compaction instant which will be executed: 20240808160446342

post this I added --disable-compaction Now, both scheduling and execution have stopped.

What I need is to diable compaction execution in delta streamer and keep running the compaction schedule

danny0405 commented 3 months ago

I'm afraid we can not do that now, both the scheduling and execution are controled by the check cfg.isAsyncCompactionEnabled(): https://github.com/apache/hudi/blob/35c00daaf871a6c1b87d6a440832d60f9b26ee14/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java#L898

https://github.com/apache/hudi/blob/35c00daaf871a6c1b87d6a440832d60f9b26ee14/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java#L898

ad1happy2go commented 3 months ago

@abhisheksahani91 As discussed Closing this, as the feature is available in new hudi release.