apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.43k forks source link

[SUPPORT] Upsert in table with record level index takes over an hour #12252

Closed dataproblems closed 1 week ago

dataproblems commented 1 week ago

To Reproduce

Steps to reproduce the behavior:

  1. Create a table with Record level index using insert mode
  2. Create a single row dataset and perform upsert

Expected behavior

The upsert operation should complete within a minute or two.

Environment Description

Additional context

Table creation options

val insertOptions: Map[String, String] = Map(
DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
  DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
  HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
  HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
  "hoodie.parquet.small.file.limit" -> "1073741824",
  HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
  HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
  HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
  "hoodie.metadata.record.index.enable" -> "true",
  "hoodie.metadata.enable" -> "true",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.write.partitionpath.field" -> "SomePartitionField",
  "hoodie.datasource.write.recordkey.field" -> "SomeRecordKey",
  "hoodie.datasource.write.precombine.field" -> "SomeTimestampField",
  "hoodie.table.name" -> "SomeTableName",
  DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[SimpleKeyGenerator].getName,
  "hoodie.write.markers.type" -> "DIRECT",
  "hoodie.embed.timeline.server" -> "true",
  "hoodie.metadata.record.index.min.filegroup.count" -> "500", // This was data specific. 
 )

Spark UI: Stages

Spark UI: Stages View

Stage Detail View

Stage Detail View

Completed task metrics

Completed Task Metrics View

Commit file contents

I looked up a single record key from the table and created dummy record for it to upsert and then carried out the upsert.

{
  "partitionToWriteStats" : {
    "REDACTED" : [ {
      "fileId" : "REDACTED-0",
      "path" : "REDACTED.parquet",
      "cdcStats" : null,
      "prevCommit" : "20241113220830932",
      "numWrites" : 2098048,
      "numDeletes" : 0,
      "numUpdateWrites" : 1,
      "numInserts" : 0,
      "totalWriteBytes" : 241330130,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "REDACTED",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 241330130,
      "minEventTime" : null,
      "maxEventTime" : null,
      "runtimeStats" : {
        "totalScanTime" : 0,
        "totalUpsertTime" : 50020,
        "totalCreateTime" : 0
      }
    } ]
  },
  "compacted" : false,
  "extraMetadata" : {
    "schema" : "REDACTED"
  },
  "operationType" : "UPSERT"
}%                                                

Do you have an idea as to why this might be happening?

dataproblems commented 1 week ago

Actually - scratch this, setting hoodie.upsert.shuffle.parallelism to 2000 fixed this problem.