fugue-project / fugue

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.
https://fugue-tutorials.readthedocs.io/
Apache License 2.0
2.01k stars 94 forks source link

[QUESTION] How to use a CoTransformer on data frames with shared non-key columns #358

Open jstammers opened 2 years ago

jstammers commented 2 years ago

I have a function that aims to implement an SCD2 merge on two dataframes.

In my example, I am attempting to merge two dataframes together, using a single column as the key. The transformation should modify rows with a matching key, and insert all rows from the second dataframe.

When I execute this code, the zip method by default performs an inner join using the columns which are duplicated across the dataframes. This has the effect of dropping rows with missing values in shared columns, which means that the input dataframes to the scd2_merge are not what is expected.

from typing import Iterable, Dict, Any, List
import pandas as pd
from fugue import DataFrames, FugueWorkflow

df1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, None, 3], "c": [1, 1, 1]})
df2 = pd.DataFrame({"a": [2, 3, 4], "b": [2, 3, None]})

# schema: a:int,b:float,c:int
def scd2_merge(dfs: DataFrames) -> List[List[Any]]:
    """performs an SCD2-type merge.
     Any rows in df1 that have a matching key value in df2 will have
     their current flag 'c'
     set to 0, before the rows in df2 are inserted"""
    ix = "a"
    df1 = dfs[0].as_pandas()
    df2 = dfs[1].as_pandas()
    df1.loc[df1[ix].isin(df2[ix]), "c"] = 0
    df2["c"] = 1
    return pd.concat([df1, df2])

with FugueWorkflow(engine='pandas') as dag:
    df1 = dag.df(df1, "a:int,b:float,c:int")
    df2 = dag.df(df2, "a:int,b:float")
    dag.zip(df1, df2).transform(scd2_merge).show()

Is there a correct way to implement this type of transformation?

goodwanghan commented 2 years ago

You can do

df1.partition_by("a").zip(df2).transform(scd2_merge).show()

or

dag.zip(df1, df2, partition={"by":"a"}).transform(scd2_merge).show()
goodwanghan commented 2 years ago

I also modified your code a little bit to follow good practices:

import pandas as pd
from fugue import FugueWorkflow

_df1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, None, 3], "c": [1, 1, 1]})
_df2 = pd.DataFrame({"a": [2, 3, 4], "b": [2, 3, None]})

# schema: a:int,b:float,c:int
def scd2_merge(df1:pd.DataFrame, df2:pd.DataFrame) -> pd.DataFrame:
    """performs an SCD2-type merge.
     Any rows in df1 that have a matching key value in df2 will have
     their current flag 'c'
     set to 0, before the rows in df2 are inserted"""
    ix = "a"
    df1 = df1.assign(c=~df1[ix].isin(df2[ix]))
    df2 = df2.assign(c=1)
    return pd.concat([df1, df2])

dag = FugueWorkflow()
df1 = dag.df(_df1)
df2 = dag.df(_df2)
df1.partition_by("a").zip(df2).transform(scd2_merge).show()

dag.run()
  1. Reduced the dependency on Fugue, the input of the scd_merge can all be native types
  2. Removed the with statement, remove the predefined engine on with statement. DAG definition doesn't require context manager, dag,run() should separate
  3. Removed unnecessary schemas when you do dag.df
  4. Use assign to avoid mutating the dataframe (pandas good practice)
  5. Renamed df* to _df* (python good practice)
jstammers commented 2 years ago

HI @goodwanghan, thanks for the suggestions - I've been able to modify my code as you've suggested to get it to work using the partition_by.zip syntax. For my particular use-case, I needed to use

.zip(df2, how="full_outer")

to ensure that I had non-intersecting keys in the output.

Also, I am intending to run this in production using the SparkExecutionEngine. Is there anything about this workflow I should be aware of that could affect the performance?

goodwanghan commented 2 years ago

Wonderful! When you test on spark, don't use SparkExecutionEngine, you should just use the spark session:

dag.run(spark_session)

spark.sql.shuffle.partitions should be set properly (this applies to general spark execution)

You can enable fugue.spark.use_pandas_udf to see if it can be faster, I think for your case it may not have an effect.