MrPowers / mack

Delta Lake helper methods in PySpark
https://mrpowers.github.io/mack/
MIT License
294 stars 42 forks source link

update function type_2_scd_generic_upsert #2

Open Amrit-Hub opened 1 year ago

Amrit-Hub commented 1 year ago

Hey, In the upsert query instead of

baseTable.alias("base")
        .merge(
            source=stagedUpdates.alias("staged_updates"),
            condition=pyspark.sql.functions.expr(
                f"base.{primaryKey} = mergeKey AND base.{isCurrentColName} = true AND ({stagedUpdatesAttrs})"
            ),
        )
        .whenMatchedUpdate(
            set={
                isCurrentColName: "false",
                endTimeColName: f"staged_updates.{effectiveTimeColName}",
            }
        )
        .whenNotMatchedInsert(values=res_thing)
        .execute()

you can make it as:

baseTable.alias("base")
      .merge(
          source=stagedUpdates.alias("staged_updates"),
          condition=pyspark.sql.functions.expr(
              f"base.{primaryKey} = mergeKey" 
          ),
      )
      .whenMatchedUpdate(
          condition = f"base.{isCurrentColName} = true AND ({stagedUpdatesAttrs})",
          set={
              isCurrentColName: "false",
              endTimeColName: f"staged_updates.{startTimeColName}",
          }
      )
      .whenNotMatchedInsert(values=insert_thing)
      .execute()

This will prevent duplicates if I run the same function again and again with no change in base and updates table.

MrPowers commented 1 year ago

@Amrit-Hub - thanks for commenting. Can you please open a PR with your code? Can you also add tests to demonstrate why this implementation is better? Thank you!

MrPowers commented 1 year ago

@Amrit-Hub - just following up. Thank you.

dgcaron commented 1 year ago

is the commit refenced on apr 16 something that is considered to be merged as i run into the situation that is described there. in my situation i get a full data dump each day and not all records in the dump are updates / have actual changes, with the current version this will result in duplicate rows

edit:

i have been trying the proposed change and it seems the upsert doesn't handle undefined values very well either. if you would consider the change applicable i would like to create a PR for it that also handles the undefined values too.

robertkossendey commented 1 year ago

@dgcaron it would be greatly appreaciated if you could contribute with a PR and tests to cover the scenario! :)