apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

Found duplicate column(s) in the data schema when tries to read COW table in incremental mode with Replacecommit action in timeline #10258

Open eg-kazakov opened 10 months ago

eg-kazakov commented 10 months ago

Describe the problem you faced

When spark tries to load a hoodie dataframe from datasource with next hoodie timeline setup:

./.hoodie:
total 15392
drwxr-xr-x  9 ekazakov  staff      288 Dec  6 20:35 .
drwxr-xr-x  5 ekazakov  staff      160 Dec  6 23:00 ..
-rw-r--r--  1 ekazakov  staff   257944 Dec  6 13:50 20231206064834739.replacecommit
-rw-r--r--  1 ekazakov  staff      123 Dec  6 13:50 20231206064834739.replacecommit.inflight
-rw-r--r--  1 ekazakov  staff     2522 Dec  6 13:48 20231206064834739.replacecommit.requested
-rw-r--r--  1 ekazakov  staff  3350228 Dec  6 14:06 20231206065013087.clean
-rw-r--r--  1 ekazakov  staff  2128543 Dec  6 14:01 20231206065013087.clean.inflight
-rw-r--r--  1 ekazakov  staff  2128543 Dec  6 14:01 20231206065013087.clean.requested
-rw-r--r--  1 ekazakov  staff      885 Nov 23 10:44 hoodie.properties

I am getting: org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: _hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, _hoodie_partition_path, _hoodie_record_key

and it fails to load existed data

To Reproduce

Steps to reproduce the behavior:

  1. I've attached my hoodie table structure here: google drive
  2. Here is code for reproducing:
        df = sparkSession.sqlContext.read
          // infer trip schema
          .format("hudi")
          .option(QUERY_TYPE.key, QUERY_TYPE_INCREMENTAL_OPT_VAL)
          .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/movement_type=*/{year=2023/month=12/day=05,year=2023/month=12/day=04}/provider=*/qk=*/*.parquet")
          .option(BEGIN_INSTANTTIME.key, "20231205063117")
          .load("hudi-output-duplicated-columns/")

Expected behavior

I am expecting loaded data from hudi table without failures

Environment Description

Additional context During debug I've noticed that columns got duplicated:

Снимок экрана 2023-12-06 в 22 08 03
danny0405 commented 10 months ago

You are right, spark incremenal query does not filter out these replace commits, while Flink support it, in Flink we has sql option: read.streaming.skip_clustering.

eg-kazakov commented 10 months ago

@danny0405 Is there any workaround for this? Right now it is blocking from loading hoodie metadata. Should we do some clean up prior to filter out replace commits?

eg-kazakov commented 10 months ago

I think that replacecommits when we perform hard partitions delete by writing empty datasets in place of existed partitions.

deletePartitionDF.write.format("hudi")
      .option(HoodieWriteConfig.TBL_NAME.key, HUDI_TABLE_NAME)
      .option(OPERATION.key, DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL)
      .option(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key, partitionUrls)
      .option(TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
      .option(RECORDKEY_FIELD.key, HUDI_RECORD_KEY)
      .option("hoodie.cleaner.policy", HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name())
      .option("hoodie.cleaner.fileversions.retained","1")
      .option(HIVE_STYLE_PARTITIONING.key, "true")
      .mode(SaveMode.Append)
      .save(destUrl)
danny0405 commented 10 months ago

We are planning to migrate all our incremental queries to base on compleiton time instead of the instant time(start time), and would support the skipping for clustering and compaction for spark at the same time, hople we can complete that in release 1.0.0.

danny0405 commented 10 months ago

cc @beyond1920 for visibility.

beyond1920 commented 10 months ago

@eg-kazakov Hi, is the replacement commit generated by clustering or insert overwrite? we could skip it if it's generated by clustering, but we could not skip it if it's generated by insert overwrite.

eg-kazakov commented 10 months ago

@eg-kazakov Hi, is the replacement commit generated by clustering or insert overwrite? we could skip it if it's generated by clustering, but we could not skip it if it's generated by insert overwrite.

I believe it is insert overwrite. In my case I've switched partition deletion (by empty dataset overwrite) and regular saving operation, so my timeline looks like:

2023-12-08 13:48:12     256486 20231208064747732.replacecommit
2023-12-08 13:47:59        123 20231208064747732.replacecommit.inflight
2023-12-08 13:47:51       2522 20231208064747732.replacecommit.requested
2023-12-08 13:56:59    2497730 20231208064811611.clean
2023-12-08 13:52:18    2531952 20231208064811611.clean.inflight
2023-12-08 13:52:18    2531952 20231208064811611.clean.requested

That does the trick for loading phase, since I do not have situation where replace commit is last operation prior loading.

VitoMakarevich commented 3 months ago

i believe guys it's because in Some hudi versions between 0.12. and 0.14. - Hudi started to write schema in the replacecommit and this schema for some reason includes service fields, while normal commit does not include it. So when Hudi tries to get the schema - it tries to load the latest commit, if there is no schema(like prior 0.14 replacecommit have an empty string in schema field) - tries to load schema from some datafile which is being correct.

VitoMakarevich commented 3 months ago

@codope it looks to be the same as https://github.com/apache/hudi/issues/10533 the problem seems to be that Hud is inconsistently writing schema in commits - in some cases it's with metadata fields, in some not. E.g. replacecommit seems to be creating schema with metadata fields(after delete_partitions operation at least), while normal write not

VitoMakarevich commented 3 months ago

I think it's introduced here https://github.com/apache/hudi/pull/5610/files so there are 2 differencies:

  1. replacecommit started to have a schema(0.12.1 has it empty field)
  2. the schema include internal fiels, while e.g. for normal delete - this code is used HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient) - skipping internal fields
VitoMakarevich commented 3 months ago

I see it's been changed for normal delete here https://github.com/apache/hudi/pull/9743/files, but not for delete_partitions

danny0405 commented 3 months ago

Nice findings ~ Would you mind to fire a fix.

codope commented 3 months ago

@VitoMakarevich Thanks for debugging and putting up a fix. I will review the patch.