MrPowers / mack

Delta Lake helper methods in PySpark
https://mrpowers.github.io/mack/
MIT License
303 stars 44 forks source link

Brainstorming public interface helper methods for natural keys, surrogate keys, and primary keys #31

Open MrPowers opened 1 year ago

MrPowers commented 1 year ago

I always get the natural key / surrogate key terminology mixed up, but they're important concepts and we should make it easy for Delta Lake users to manage different types of keys.

We'll want functionality that makes it easy to determine if the md5 hash of multiple columns forms a unique identifier for each row in the dataset. I think this is referred to as a natural key.

A surrogate key is not derived from application data and is something like a UUID. I think we should make it easy to add a UUID column to a Delta table.

I'd like to avoid adding public interface methods that use the terms "natural" and "surrogate". I would like to make this functionality easily accessible for the Delta Lake community. Open to thoughts on good interfaces.

souvik-databricks commented 1 year ago

@MrPowers What do you think about adding a quick helper function in mack to identify the set of columns that can identify a unique row in a dataset?

I am thinking something like this

from itertools import combinations

df = spark.table("sample_data.role_data_1")

def find_level(df):
    df = df.pandas_api()
    for n in range(1, len(df.columns)+1):
        for c in combinations(df.columns, n):
            if not df[list(c)].duplicated().any():
                return list(df[list(c)].columns)

find_level(df)
image
souvik-databricks commented 1 year ago

@MrPowers

Then the md5 function can leverage the result of the find_level() function and generate the md5 value based on those columns using something like this:

import pyspark.sql.functions as f

def get_md5(df,list_of_columns):
    return df.withColumn("md5", f.md5(f.concat_ws("||", *list_of_columns)))

output = get_md5(df,['id', 'name', 'timestamp'])
output.show(5)
image
MrPowers commented 1 year ago

@souvik-databricks - yep, looks like you're thinking about this well 😉

I just found these definitions:

Source is here.

Perhaps find_level should be called find_composite_key. We'll need to add an optional argument that lets the user exclude certain columns.

Any chance you can try to figure this out with regular PySpark and not use the Pandas API?

souvik-databricks commented 1 year ago

@MrPowers the pandas_api() is in normal OSS pyspark see docs here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.pandas_api.html

MrPowers commented 1 year ago

@souvik-databricks - yep, I'm aware that pandas_api is part of OSS PySpark, but I don't think we should be using it 😉

souvik-databricks commented 1 year ago

Ahh okay got it.. Yeah that's a small change I will replace it and do this logic df[list(c)].duplicated().any() using normal dataframe APIs. But why we don't want to leverage pandas_api though @MrPowers

souvik-databricks commented 1 year ago

@MrPowers here are the function using only regular spark dataframe APIs without pandas_api

import pyspark.sql.functions as f
from itertools import combinations

df = spark.table("sample_data.role_data_1")

def find_composite_key(df, exclude_cols=[]):
    df_col_excluded = df.drop(*exclude_cols)
    total_row_count = df_col_excluded.distinct().count()
    for n in range(1, len(df_col_excluded.columns)+1):
        for c in combinations(df_col_excluded.columns, n):
            if (df_col_excluded.select(*c).distinct().count() == total_row_count):
                return list(df_col_excluded.select(*c).columns)

composite_keys = find_composite_key(df)
print("Composite Keys of the dataset:: ",composite_keys)

def get_md5(df,list_of_columns):
    return df.withColumn("md5", f.md5(f.concat_ws("||", *list_of_columns)))

output = get_md5(df,composite_keys)
output.show(5)
image

@MrPowers You can assign the ticket to me I will add these functions to mack.

MrPowers commented 1 year ago

@souvik-databricks - assigned you, have at it 💪

souvik-databricks commented 1 year ago

@MrPowers We can close this issue. The PR is merged.

MrPowers commented 1 year ago

Let's keep this issue open and brainstorm the other "key helper methods" we should be exposing. I'd like to map out next steps for providing the community guidance on how to maintain a primary key, how to build composite keys, etc. I think this is a big pain point for the Delta Lake community.

souvik-databricks commented 1 year ago

@MrPowers I tried to generate the combination of all the columns which can give a more wholesome view of all of the columns combinations and candidate keys/ surrogate keys which can be a identified like the below code snippet and results

import pyspark.sql.functions as F
import pandas as pd
from itertools import combinations

pdf = pd.DataFrame([[1, 1, 1], [2, 1, 1], [3, 2, 1]], columns=['x', 'y', 'z'])

df = spark.createDataFrame(pdf)
df.show()

initCols = df.columns

for i in range(len(initCols)+1):
    for c in list(combinations(initCols, i+2)):
        df = df.withColumn(','.join(c), F.concat_ws(',', *c))

finalCols = df.columns

exprs = [F.size(F.collect_set(x)).alias(x) for x in finalCols]

df = df\
.withColumn("column_combos ->", F.lit("row_counts ->"))\
.groupBy("column_combos ->")\
.agg(*exprs)

df.show()
image

@MrPowers what do you think about this?

MrPowers commented 1 year ago

@souvik-databricks - looks like a cool function 😎

souvik-databricks commented 1 year ago

@MrPowers Should I add the function to mack?

robertkossendey commented 1 year ago

@souvik-databricks that would be great! maybe we could additionally filter out columns that are < count(*) on the DF, so that we only display combinations that are valid composite key candidates.

souvik-databricks commented 1 year ago

Makes sense to have only the valid candidates @robertkossendey. All right with that confirmation let me raise a PR for this one as this one would be a quick one to knock out.

robertkossendey commented 1 year ago

@souvik-databricks awesome, thanks!

souvik-databricks commented 1 year ago

@robertkossendey Raised PR #58 for this. Added you as reviewer.