dask / dask-expr

BSD 3-Clause "New" or "Revised" License
83 stars 22 forks source link

Column projection on cross-joined frames can result in duplicate columns #1018

Closed charlesbluca closed 5 months ago

charlesbluca commented 5 months ago

Describe the issue: We seem to run into issues with column projection on cross-joined frames that can sometimes lead to computed collections returning duplicate columns.

Minimal Complete Verifiable Example:

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(
    {
        "a": [1, 2, 3] * 5,
        "b": [1, 2, 3] * 5,
        "c": ["A"] * 15,
    },
)
ddf = dd.from_pandas(df, npartitions=1)

merged_ddf = dd.merge(ddf, ddf, on="a")
res = merged_ddf[merged_ddf["c_x"] == "A"]["c_y"]

print(res.columns)  # ['c_y']
print(res.compute().columns)  # Index(['c_y', 'c_y'], dtype='object')

Worth noting that this seems pretty finicky - removing column b from the above resolved the issue, while setting npartitions higher resulted in more duplicate columns:

...
ddf = dd.from_pandas(df, npartitions=2)
...
print(res.compute().columns)  # Index(['c_y', 'c_y', 'c_y', 'c_y'], dtype='object')

Environment: