apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] Duplicate fileId 00000002-6791-4a32-8edb-e33c558e0df3 from bucket 2 of partition found during the BucketStreamWriteFunction index bootstrap. #10849

Closed Toroidals closed 4 months ago

Toroidals commented 4 months ago

Tips before filing an issue

Describe the problem you faced

1.First initialize data writing to Hudi using Flink BULK_INSERT mode 2.Then write CDC incremental data using Flink UPSERT mode,give an error: Caused by: java.lang.RuntimeException: Duplicate fileId 00000002-6791-4a32-8edb-e33c558e0df3 from bucket 2 of partition found during the BucketStreamWriteFunction index bootstrap.

To Reproduce 1.First initialize data writing to Hudi using Flink BULK_INSERT mode public class CustomHudiStreamSink {

public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, String> infoMap, HashMap<String, String> connectInfo) {

    HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name"));

    //add Field
    String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
    ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() {
    });
    for (ArrayList<String> columnList : fieldList) {
        builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
    }

    //add PrimaryKey
    String[] hudiPrimaryKeys = infoMap.get("hudi_primary_key").split(",");
    builder.pk(hudiPrimaryKeys);

    Map<String, String> options = new HashMap<>();
    options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path"));
    options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
    options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name"));
    options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));

    options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts_ms");

    //add index
    options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
    options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());

    //WRITE_TASKS 
    options.put(FlinkOptions.WRITE_TASKS.key(), "4");

    //bucket assigner
    options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), "4");
    options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), 128);
    options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), "SIMPLE");

    //COMPACTION
    options.put(FlinkOptions.COMPACTION_TASKS.key(), "4");
    options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), "num_or_time");
    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "5");
    options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "300");

    //hive sync
    options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled"));
    options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode"));
    options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db"));
    options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table"));
    options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
    options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url"));
    options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url"));
    options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
    options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");

    //add partition
    options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "part");
    options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "part");

    //write mode
    options.put(FlinkOptions.OPERATION.key(), WriteOperationType.BULK_INSERT.value());

    builder.options(options);
    return builder;
}

}

2.Then write CDC incremental data using Flink UPSERT mode,give an error: public class CustomHudiStreamSink {

public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, String> infoMap, HashMap<String, String> connectInfo) {

    HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name"));

    //add Field
    String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
    ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() {
    });
    for (ArrayList<String> columnList : fieldList) {
        builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
    }

    //add PrimaryKey
    String[] hudiPrimaryKeys = infoMap.get("hudi_primary_key").split(",");
    builder.pk(hudiPrimaryKeys);

    Map<String, String> options = new HashMap<>();
    options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path"));
    options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
    options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name"));
    options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));

    options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts_ms");

    //add index
    options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
    options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());

    //WRITE_TASKS 
    options.put(FlinkOptions.WRITE_TASKS.key(), "4");

    //bucket assigner
    options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), "4");
    options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), 128);
    options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), "SIMPLE");

    //COMPACTION
    options.put(FlinkOptions.COMPACTION_TASKS.key(), "4");
    options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), "num_or_time");
    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "5");
    options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "300");

    //hive sync
    options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled"));
    options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode"));
    options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db"));
    options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table"));
    options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
    options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url"));
    options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url"));
    options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
    options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");

    //add partition
    options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "part");
    options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "part");

    //write mode
    options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.value());

    builder.options(options);
    return builder;
}

}

3. 4.

Expected behavior The CDC incremental data is written normally.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

Toroidals commented 4 months ago

There are one configuration information that are not known if should be added,Can using Flink to write to the hudi table and synchronize it to hive trigger duplicate data merging if only external tables synchronized to hive are queried: PRE_COMBINE = ConfigOptions.key("write.precombine").booleanType().defaultValue(false).withDescription("Flag to indicate whether to drop duplicates before insert/upsert.\nBy default these cases will accept duplicates, to gain extra performance:\n1) insert operation;\n2) upsert for MOR table, the MOR table deduplicate on reading");

Toroidals commented 4 months ago

Complete error message: Caused by: java.lang.RuntimeException: Duplicate fileId 00000002-6791-4a32-8edb-e33c558e0df3 from bucket 2 of partition found during the BucketStreamWriteFunction index bootstrap. at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.lambda$bootstrapIndexIfNeed$1(BucketStreamWriteFunction.java:167) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_211] at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_211] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_211] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_211] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_211] at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_211] at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_211] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_211] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_211] at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.bootstrapIndexIfNeed(BucketStreamWriteFunction.java:160) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?] at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.processElement(BucketStreamWriteFunction.java:112) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.2.jar:1.15.2] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_211]

Toroidals commented 4 months ago

Tips before filing an issue

  • Have you gone through our FAQs?
  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

1.First initialize data writing to Hudi using Flink BULK_INSERT mode 2.Then write CDC incremental data using Flink UPSERT mode,give an error: Caused by: java.lang.RuntimeException: Duplicate fileId 00000002-6791-4a32-8edb-e33c558e0df3 from bucket 2 of partition found during the BucketStreamWriteFunction index bootstrap.

To Reproduce 1.First initialize data writing to Hudi using Flink BULK_INSERT mode public class CustomHudiStreamSink {

public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, String> infoMap, HashMap<String, String> connectInfo) {

    HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name"));

    //add Field
    String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
    ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() {
    });
    for (ArrayList<String> columnList : fieldList) {
        builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
    }

    //add PrimaryKey
    String[] hudiPrimaryKeys = infoMap.get("hudi_primary_key").split(",");
    builder.pk(hudiPrimaryKeys);

    Map<String, String> options = new HashMap<>();
    options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path"));
    options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
    options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name"));
    options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));

    options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts_ms");

    //add index
    options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
    options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());

    //WRITE_TASKS 
    options.put(FlinkOptions.WRITE_TASKS.key(), "4");

    //bucket assigner
    options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), "4");
    options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), 128);
    options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), "SIMPLE");

    //COMPACTION
    options.put(FlinkOptions.COMPACTION_TASKS.key(), "4");
    options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), "num_or_time");
    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "5");
    options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "300");

    //hive sync
    options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled"));
    options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode"));
    options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db"));
    options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table"));
    options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
    options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url"));
    options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url"));
    options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
    options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");

    //add partition
    options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "part");
    options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "part");

    //write mode
    options.put(FlinkOptions.OPERATION.key(), WriteOperationType.BULK_INSERT.value());

    builder.options(options);
    return builder;
}

}

2.Then write CDC incremental data using Flink UPSERT mode,give an error: public class CustomHudiStreamSink {

public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, String> infoMap, HashMap<String, String> connectInfo) {

    HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name"));

    //add Field
    String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
    ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() {
    });
    for (ArrayList<String> columnList : fieldList) {
        builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
    }

    //add PrimaryKey
    String[] hudiPrimaryKeys = infoMap.get("hudi_primary_key").split(",");
    builder.pk(hudiPrimaryKeys);

    Map<String, String> options = new HashMap<>();
    options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path"));
    options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
    options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name"));
    options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));

    options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts_ms");

    //add index
    options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
    options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());

    //WRITE_TASKS 
    options.put(FlinkOptions.WRITE_TASKS.key(), "4");

    //bucket assigner
    options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), "4");
    options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), 128);
    options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), "SIMPLE");

    //COMPACTION
    options.put(FlinkOptions.COMPACTION_TASKS.key(), "4");
    options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), "num_or_time");
    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "5");
    options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "300");

    //hive sync
    options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled"));
    options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode"));
    options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db"));
    options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table"));
    options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
    options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url"));
    options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url"));
    options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
    options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");

    //add partition
    options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "part");
    options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "part");

    //write mode
    options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.value());

    builder.options(options);
    return builder;
}

}

Expected behavior The CDC incremental data is written normally.

Environment Description

  • Hudi version : 0.14.0
  • Hive version :3.1.3
  • Flink version : 1.15.2
  • Storage (HDFS/S3/GCS..) :
  • Running on Docker? (yes/no) :

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.