apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.42k forks source link

[SUPPORT] Flink and CONSISTENT_HASHING bucket raises NullPointerException when checkpoint process after clustering process #11536

Closed joker1007 closed 4 months ago

joker1007 commented 4 months ago

Describe the problem you faced

We used Flink and CONSISTENT_BUCKET to write records.

Set clustering.schedule.enabled=true.

Once the writing process stopped, a scheduled clustering job was executed using spark-submit. The process completed successfully, and a replacecommit file was created.

We then resumed the process in Flink, and encountered a NullPointerException during the next checkpoint process.

The Flink SQL definition is as follows.

CREATE TABLE IF NOT EXISTS hudi_experiment_flink(
  key STRING NOT NULL PRIMARY KEY NOT ENFORCED,

  insight_id INT NOT NULL,
  user_id BIGINT NOT NULL,
  profile_key STRING NOT NULL,
  profile_type STRING,
  profile_value STRING NOT NULL,
  updated_at BIGINT NOT NULL
)
PARTITIONED BY (`insight_id`)
WITH (
  'connector' = 'hudi',
  'path' = 's3://repro-batch-store/production/hudi_experiment_flink',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.database.name' = 'default',
  'hoodie.table.name' = 'hudi_experiment_flink',

  'hoodie.embed.timeline.server' = 'false',

  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'index.type' = 'BUCKET',
  'hoodie.index.bucket.engine' = 'CONSISTENT_HASHING',
  'hoodie.bucket.index.hash.field' = 'key',
  'hoodie.bucket.index.num.buckets' = '2',
  'hoodie.bucket.index.min.num.buckets' = '1',
  'hoodie.bucket.index.max.num.buckets' = '128',
  'clean.retain_commits' = '5',
  'compaction.async.enabled' = 'true',
  'clustering.schedule.enabled' = 'true',

  'hoodie.parquet.compression.codec' = 'zstd',

  'write.precombine' = 'true',
  'precombine.field' = 'updated_at',
  'write.parquet.max.file.size' = '128',

  'write.tasks' = '160',
  'write.bucket_assign.tasks' = '120',

  'write.task.max.size' = '4096',
  'write.sort.memory' = '2048',
  'write.merge.max_memory' = '2048',
  'compaction.max_memory' = '2048'
);

To Reproduce

Steps to reproduce the behavior:

  1. Write records with COSISTENT_HASHING index by Flink
  2. Stop after scheduling the clustering job
  3. Run clustering job by spark-submit
  4. Re-run Flink and wait for checkpointing

Expected behavior

checkpointing completes successfully.

Environment Description

I use Amazon EMR 7.1,0

Additional context

Add any other context about the problem here.

Stacktrace

2024-06-28 09:43:37,091 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline checkpoint 1 by task 618dfe7c8a0e6eebc84755c5461dbcca_cbc357ccb763df2852fee8c4fc7d55f2_37_0 of job a8228bc3e728c87a1e3664bb33ba3aec at container_1719488516196_0002_01_000024 @ ip-172-31-82-76.ap-northeast-1.compute.internal032repro.work (dataPort=39943).
org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: updated_user_profile_per_keys_kafka[1] -> Calc[2] -> ConstraintEnforcer[3] -> row_data_to_hoodie_record -> consistent_bucket_assigner: default_database.user_profile_per_keys_hudi_flink (38/120)#0 Failure reason: Task has failed.
    at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1396) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1339) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$14(StreamTask.java:1159) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at java.lang.Thread.run(Thread.java:840) [?:?]
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not perform checkpoint 1 for operator Source: updated_user_profile_per_keys_kafka[1] -> Calc[2] -> ConstraintEnforcer[3] -> row_data_to_hoodie_record -> consistent_bucket_assigner: default_database.user_profile_per_keys_hudi_flink (38/120)#0.
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1203) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$14(StreamTask.java:1150) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    ... 13 more
Caused by: org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator Source: updated_user_profile_per_keys_kafka[1] -> Calc[2] -> ConstraintEnforcer[3] -> row_data_to_hoodie_record -> consistent_bucket_assigner: default_database.user_profile_per_keys_hudi_flink (38/120)#0. Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:718) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:351) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1191) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$14(StreamTask.java:1150) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    ... 13 more
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.NullPointerException: Cannot read field "value" because "anotherString" is null
    at java.lang.String.compareTo(String.java:2017) ~[?:?]
    at org.apache.hudi.common.table.timeline.HoodieTimeline.lambda$static$2(HoodieTimeline.java:410) ~[?:?]
    at org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps(HoodieTimeline.java:415) ~[?:?]
    at org.apache.hudi.common.table.timeline.HoodieDefaultTimeline.lambda$findInstantsAfter$22(HoodieDefaultTimeline.java:244) ~[?:?]
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:178) ~[?:?]
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?]
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ~[?:?]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) ~[?:?]
    at org.apache.hudi.common.table.timeline.HoodieDefaultTimeline.<init>(HoodieDefaultTimeline.java:66) ~[?:?]
    at org.apache.hudi.common.table.timeline.HoodieDefaultTimeline.findInstantsAfter(HoodieDefaultTimeline.java:244) ~[?:?]
    at org.apache.hudi.sink.bucket.ConsistentBucketAssignFunction.snapshotState(ConsistentBucketAssignFunction.java:138) ~[?:?]
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:88) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:718) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:351) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1191) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$14(StreamTask.java:1150) ~[flink-dist-1.18.1-amzn-0.jar:1.18.1-amzn-0]
    ... 13 more

I checked the source code and it looks like ConsistentBucketAssignFunction#snapshotState is null for some reason.

Is there a case where lastRefreshInstant is null?

  @Override
  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    HoodieTimeline timeline = writeClient.getHoodieTable().getActiveTimeline().getCompletedReplaceTimeline().findInstantsAfter(lastRefreshInstant);
    if (!timeline.empty()) {
      for (HoodieInstant instant : timeline.getInstants()) {
        HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes(
            timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
        Set<String> affectedPartitions = commitMetadata.getPartitionToReplaceFileIds().keySet();
        LOG.info("Clear up cached hashing metadata because find a new replace commit.\n Instant: {}.\n Effected Partitions: {}.",  lastRefreshInstant, affectedPartitions);
        affectedPartitions.forEach(this.partitionToIdentifier::remove);
      }
      this.lastRefreshInstant = timeline.lastInstant().get().getTimestamp();
    }
  }
danny0405 commented 4 months ago

Thanks for the feedback, cc @beyond1920 can you take a look?

danny0405 commented 4 months ago

Thanks for the feedback, it should be fixed in: https://github.com/apache/hudi/pull/11550

beyond1920 commented 4 months ago

@joker1007 Thanks for reporting this bug. The bugfix pr 11550 is merged.