apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.43k forks source link

[SUPPORT]The default behavior of spark sql insert into has changed from upsert to insert? #12168

Closed bithw1 closed 2 weeks ago

bithw1 commented 3 weeks ago

I looked into the source code of Hudi 0.15, and find the option hoodie.sql.insert.mode is deprecated.

  @Deprecated
  val SQL_INSERT_MODE: ConfigProperty[String] = ConfigProperty
    .key("hoodie.sql.insert.mode")
    .defaultValue("upsert")
    .markAdvanced()
    .deprecatedAfter("0.14.0")
    .withDocumentation("Insert mode when insert data to pk-table. The optional modes are: upsert, strict and non-strict." +
      "For upsert mode, insert statement do the upsert operation for the pk-table which will update the duplicate record." +
      "For strict mode, insert statement will keep the primary key uniqueness constraint which do not allow duplicate record." +
      "While for non-strict mode, hudi just do the insert operation for the pk-table. This config is deprecated as of 0.14.0. Please use " +
      "hoodie.spark.sql.insert.into.operation and hoodie.datasource.insert.dup.policy as you see fit.")

Prior to hudi 0.14.0 ,the default behavior for spark sql insert into statement is doing upsert which will not introduce duplicates.In the doc ,it says that this option is replaced with two options: hoodie.spark.sql.insert.into.operation and hoodie.datasource.insert.dup.policy.

The definition for hoodie.spark.sql.insert.into.operation is as follows, the default value for the spark.sql.insert.into.operation has been changed to insert.

  val SPARK_SQL_INSERT_INTO_OPERATION: ConfigProperty[String] = ConfigProperty
    .key("hoodie.spark.sql.insert.into.operation")
    .defaultValue(WriteOperationType.INSERT.value())
    .withValidValues(WriteOperationType.BULK_INSERT.value(), WriteOperationType.INSERT.value(), WriteOperationType.UPSERT.value())
    .markAdvanced()
    .sinceVersion("0.14.0")
    .withDocumentation("Sql write operation to use with INSERT_INTO spark sql command. This comes with 3 possible values, bulk_insert, " +
      "insert and upsert. bulk_insert is generally meant for initial loads and is known to be performant compared to insert. But bulk_insert may not " +
      "do small file management. If you prefer hudi to automatically manage small files, then you can go with \"insert\". There is no precombine " +
      "(if there are duplicates within the same batch being ingested, same dups will be ingested) with bulk_insert and insert and there is no index " +
      "look up as well. If you may use INSERT_INTO for mutable dataset, then you may have to set this config value to \"upsert\". With upsert, you will " +
      "get both precombine and updates to existing records on storage is also honored. If not, you may see duplicates. ")

The definition for hoodie.datasource.insert.dup.policy is as follows, the default value is none

  val INSERT_DUP_POLICY: ConfigProperty[String] = ConfigProperty
    .key("hoodie.datasource.insert.dup.policy")
    .defaultValue(NONE_INSERT_DUP_POLICY)
    .withValidValues(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY, FAIL_INSERT_DUP_POLICY)
    .markAdvanced()
    .sinceVersion("0.14.0")
    .withDocumentation("**Note** This is only applicable to Spark SQL writing.<br />When operation type is set to \"insert\", users can optionally enforce a dedup policy. This policy will be employed "
      + " when records being ingested already exists in storage. Default policy is none and no action will be taken. Another option is to choose " +
      " \"drop\", on which matching records from incoming will be dropped and the rest will be ingested. Third option is \"fail\" which will " +
      "fail the write operation when same records are re-ingested. In other words, a given record as deduced by the key generation policy " +
      "can be ingested only once to the target table of interest.")

With default value for the above two options, it means that the default behavior of spark sql insert into statement has been changed for upsert to insert(may introduce duplicates)?

I am not sure whether I have understood correctly. If i am correct, then this change is a broken change, some people using older version are using spark sql insert into to do upsert which will not introduce duplicates, but after upgrading to 0.14.0+, the default behavior is using insert which may introduce duplicates?

danny0405 commented 3 weeks ago

yeah, if there is no record key definition it will fall back to pk-less, whereas it's INSERT.

bithw1 commented 3 weeks ago

yeah, if there is no record key definition it will fall back to pk-less, whereas it's INSERT.

Thanks @danny0405 for the reply, but I don't get what you said.

With the two default options: hoodie.datasource.insert.dup.policy=none and hoodie.spark.sql.insert.into.operation=insert, what is the behavior if record key definitions have been defined, insert or upsert?

danny0405 commented 3 weeks ago

The operation is taken the highest priority if it is set up explicitly, so it is still INSERT instead of default upsert.

ad1happy2go commented 2 weeks ago

@bithw1 Closing this. Please reopen in case of any more queries.