It started to run normally, but after a period of execution, the following error will be reported(every sync will report more than 10 partitions)
Exception trace during upsert:
content org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor [] - Error upsetting bucketType UPDATE for partition :20240119
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AbortedException:
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:61) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.markSupported(SdkFilterInputStream.java:125) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.makeResettable(AmazonHttpClient.java:999) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.beforeRequest(AmazonHttpClient.java:966) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:807) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5453) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5400) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:421) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6528) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1861) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1821) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.PutObjectCall.performCall(PutObjectCall.java:35) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.PutObjectCall.performCall(PutObjectCall.java:10) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.AbstractUploadingS3Call.perform(AbstractUploadingS3Call.java:87) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor$CallPerformer.call(GlobalS3Executor.java:111) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:138) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:191) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:186) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.putObject(AmazonS3LiteClient.java:107) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultSinglePartUploadDispatcher.create(DefaultSinglePartUploadDispatcher.java:39) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.uploadSingleCompleteFile(S3FSOutputStream.java:386) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.doClose(S3FSOutputStream.java:225) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:201) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74) ~[hadoop-common-2.10.1-amzn-4.jar:?]
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) ~[hadoop-common-2.10.1-amzn-4.jar:?]
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74) ~[hadoop-common-2.10.1-amzn-4.jar:?]
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) ~[hadoop-common-2.10.1-amzn-4.jar:?]
at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.close(SizeAwareFSDataOutputStream.java:75) ~[hudi.jar:0.13.1]
at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64) ~[hudi.jar:0.13.1]
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106) ~[hudi.jar:0.13.1]
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:132) ~[hudi.jar:0.13.1]
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319) ~[hudi.jar:0.13.1]
at org.apache.hudi.io.storage.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:84) ~[hudi.jar:0.13.1]
at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:415) ~[hudi.jar:0.13.1]
at org.apache.hudi.io.FlinkMergeHandle.close(FlinkMergeHandle.java:172) ~[hudi.jar:0.13.1]
at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:168) ~[hudi.jar:0.13.1]
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpdateInternal(BaseFlinkCommitActionExecutor.java:227) ~[hudi.jar:0.13.1]
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpdate(BaseFlinkCommitActionExecutor.java:218) ~[hudi.jar:0.13.1]
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpsertPartition(BaseFlinkCommitActionExecutor.java:189) ~[hudi.jar:0.13.1]
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:107) ~[hudi.jar:0.13.1]
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:69) ~[hudi.jar:0.13.1]
at org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:77) ~[hudi.jar:0.13.1]
at org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor.execute(FlinkUpsertCommitActionExecutor.java:51) ~[hudi.jar:0.13.1]
at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.upsert(HoodieFlinkCopyOnWriteTable.java:111) ~[hudi.jar:0.13.1]
at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:147) ~[hudi.jar:0.13.1]
at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:189) ~[hudi.jar:0.13.1]
at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:472) ~[hudi.jar:0.13.1]
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[?:1.8.0_372]
at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:464) ~[hudi.jar:0.13.1]
at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:136) ~[hudi.jar:0.13.1]
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:167) ~[hudi.jar:0.13.1]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:60) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) [flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:782) [flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372],
Hudi version :0.13.1
Flink version :1.13
Hudi Flink Config: 'connector' = 'hudi', 'path' = 's3://bnb-datalake-hudi/**', 'table.type' = 'COPY_ON_WRITE', 'write.batch.size' = '512', 'write.tasks' = '4', 'write.bucket_assign.tasks' = '4', 'write.operation' = 'upsert', 'write.task.max.size' = '4096', 'write.merge.max_memory' = '3072', 'write.precombine' = 'true', 'precombine.field' = 'update_time', 'hive_sync.enable' = 'true', 'hive_sync.db' = '---', 'hive_sync.table' = '---', 'hive_sync.mode' = 'GLUE', 'hive_sync.partition_fields' = 'date_key', 'write.rate.limit' = '15000', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
It started to run normally, but after a period of execution, the following error will be reported(every sync will report more than 10 partitions) Exception trace during upsert:
@timestamp 2024-02-06T20:38:41.222Z,