rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.42k stars 899 forks source link

[BUG] Dask_cudf merge function returns too few rows #11189

Open ChrisJar opened 2 years ago

ChrisJar commented 2 years ago

Describe the bug The dask_cudf merge functions returns too few rows when both the dtype of the column being merged on is mismatched (eg: int64 on the left and int32 on the right) and when npartitions>1

Steps/Code to reproduce bug Here's a reproducer showing that when the dtype is mismatched the number of rows returned is dependent on the number of partitions in the dataframes being merged:

import cupy as cp
import cudf
import dask_cudf

dfa = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "b":cp.random.normal(size=100000)})
dfb = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "c":cp.random.normal(size=100000)})

dfa["a"] = dfa["a"].astype("int32")
dfb["a"] = dfb["a"].astype("int64")

ddfa = dask_cudf.from_cudf(dfa, npartitions=4)
ddfb = dask_cudf.from_cudf(dfb, npartitions=4)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))

print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)

ddfa = ddfa.repartition(npartitions=3)
ddfb = ddfb.repartition(npartitions=3)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))

print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)

ddfa = ddfa.repartition(npartitions=2)
ddfb = ddfb.repartition(npartitions=2)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))

print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)

ddfa = ddfa.repartition(npartitions=1)
ddfb = ddfb.repartition(npartitions=1)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))

print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)

This returns:

npartitions:
left: 4
right: 4
Number of rows in merge result:
16083716
******************************
npartitions:
left: 3
right: 3
Number of rows in merge result:
35342145
******************************
npartitions:
left: 2
right: 2
Number of rows in merge result:
46959990
******************************
npartitions:
left: 1
right: 1
Number of rows in merge result:
100006687
******************************

Expected behavior If we perform the same operation with just cudf we can see the expected result:

import cupy as cp
import cudf

dfa = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "b":cp.random.normal(size=100000)})
dfb = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "c":cp.random.normal(size=100000)})

dfa["a"] = dfa["a"].astype("int32")
dfb["a"] = dfb["a"].astype("int64")

print(len(ddfa.merge(ddfb, how="inner", on="a")))

which returns:

100006687

which is the same as the dask_cudf version when npartitions=1

Environment overview (please complete the following information)

Environment details

cudf                      22.08.00a220629 cuda_11_py38_gff63c0a745_173    rapidsai-nightly
dask-cudf                 22.08.00a220629 cuda_11_py38_gff63c0a745_173    rapidsai-nightly
libcudf                   22.08.00a220629 cuda11_gff63c0a745_173    rapidsai-nightly
dask                      2022.6.1           pyhd8ed1ab_0    conda-forge
dask-core                 2022.6.1           pyhd8ed1ab_0    conda-forge
dask-cuda                 22.08.00a220630         py38_21    rapidsai-nightly
distributed               2022.6.1           pyhd8ed1ab_0    conda-forge

Note The same thing occurs when how={"left", "right", "outer"}

VibhuJawa commented 2 years ago

Thanks for triaging and raising this @ChrisJar . I can confirm that i can reproduce this on my end too.

Smaller minimal example for anyone interested.

import numpy as np
import cudf
import dask_cudf

df_a = cudf.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_a['a']=df_a['a'].astype(np.int32)

df_b = cudf.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})
df_b['a']=df_b['a'].astype(np.int64)
print("cudf\n",df_a.merge(df_b))
print("--"*10)
ddf_a = dask_cudf.from_cudf(df_a, npartitions=4)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=4)
print("dask_cudf\n",ddf_a.merge(ddf_b).compute())
cudf
    a  b  c
0  1  0  1
1  2  0  1
2  3  0  1
3  4  0  1
4  5  0  1
--------------------
dask_cudf
    a  b  c
0  3  0  1
0  1  0  1

Also note that this work with dask_pandas .


import numpy as np
import pandas as pd
import dask.dataframe

df_a = pd.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_a['a']=df_a['a'].astype(np.int32)

df_b = pd.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})
df_b['a']=df_b['a'].astype(np.int64)
print("cudf\n",df_a.merge(df_b))
print("--"*10)
ddf_a = dask.dataframe.from_pandas(df_a, npartitions=4)
ddf_b = dask.dataframe.from_pandas(df_b, npartitions=4)
print("dask_cudf\n",ddf_a.merge(ddf_b).compute())
cudf
    a  b  c
0  1  0  1
1  2  0  1
2  3  0  1
3  4  0  1
4  5  0  1
--------------------
dask_cudf
    a  b  c
0  1  0  1
0  3  0  1
1  4  0  1
2  5  0  1
3  2  0  1
VibhuJawa commented 2 years ago

CC: @rjzamora , @galipremsagar

rjzamora commented 2 years ago

My immediate assumption is that "int32" and "int64" columns are hashing to different partitions, but I'll need to investigate a bit to say for sure.

EDIT: I can confirm that pd.util.hash_pandas_object produces the same hash for "int32" and "int64" values, while cudf.DataFrame.hash_values() does not:

cudf_a = cudf.DataFrame({"a": np.arange(10, dtype="int32")}).hash_values()
cudf_b = cudf.DataFrame({"a": np.arange(10, dtype="int64")}).hash_values()
assert all(cudf_a.values == cudf_b.values)  # FAILS

pd_a = pd.util.hash_pandas_object(pd.DataFrame({"a": np.arange(10, dtype="int32")}))
pd_b = pd.util.hash_pandas_object(pd.DataFrame({"a": np.arange(10, dtype="int64")}))
assert all(pd_a.values == pd_b.values) # PASSES
rjzamora commented 2 years ago

@ChrisJar @VibhuJawa - I just noticed that neither of you have mentioned the UserWarning you should get for the reproducers shared above. For example, I get the following message locally:

.../site-packages/dask/dataframe/multi.py:475: UserWarning: Merging dataframes with merge column data type mismatches: 
+---------------+------------+-------------+
| Merge columns | left dtype | right dtype |
+---------------+------------+-------------+
| ('a', 'a')    | int32      | int64       |
+---------------+------------+-------------+
Cast dtypes explicitly to avoid unexpected results.

I do think it makes sense to cast the columns automatically (when possible), but I just wanted to confirm that you are gettting this warning as well?

galipremsagar commented 2 years ago

Yup, I did get that warning @rjzamora.

ChrisJar commented 2 years ago

Yup, so did I @rjzamora

VibhuJawa commented 2 years ago

@ChrisJar , Can you raise an issue on dask-sql too just as it will be hard for users to work-around from that world as there the merge can be present deep inside a messy query.

rjzamora commented 2 years ago

@VibhuJawa - Do you have a sense for what kinds of dtype mismatch we should worry about from dask-sql. It seems like we should be able to handle mismatched integer dtypes in Dask, but I wasn't planning on addressing anything else since the user should be getting a warning that their doing something dangerous/wrong (and "general" type-casting rules are pretty vague). However, you make a good point that the user may not have much control if they are working with the dask-sql API.

VibhuJawa commented 2 years ago

@rjzamora , I agree that "general" type-casting rules are pretty vague so having a general agree-able behavior will be impossible.

What type of dtype mismatch we should worry about from dask-sql.

In my opinion we should handle merges b/w columns of similar dtype , say like int32/int64 and float32/float64 etc and FAIL loudly in other cases (like int32/float32, int32/uint32 ) for dask-sql .

I would love for @ayushdg to weigh in on this as he is deep in the weeds for handling merging on the dask-sql side these days.

Just for additional context, this issue was actually found when we were triaging incorrect results in a GPU-BDB dask-sql query (See issue)

rjzamora commented 2 years ago

say like int32/int64 and float32/float64 etc

Is it common to merge on floating-point columns? That sounds inherently dangerous to me, but I suppose it could be on an integer column that was accidentally or necessarily up-casted.

VibhuJawa commented 2 years ago

Is it common to merge on floating-point columns?

I dont this its common but as you say users can do that accidentally in workflows . A common example i have seen is when people read csvs and it gets interpreted as different dtypes based on how the data is partitioned.

That said we should have guard rails around it by failing specially in the dask-sql land (will leave the dask-cudf behavior to your judgment ) following spark behavior, Like In spark land this would raise something like below:

 Failed to merge incompatible data types IntegerType and DoubleType
ayushdg commented 2 years ago

In my opinion we should handle merges b/w columns of similar dtype , say like int32/int64 and float32/float64 etc and FAIL loudly in other cases (like int32/float32, int32/uint32 ) for dask-sql .

Agreed, upcasting integer columns while joining on integer types should be handled by dask-cudf since it's something that works in cudf/pandas and in sql land integer based joins are common and the dtypes may vary.

Dask-sql will also show the same: UserWarning for dtypes but if it's a large query, it can be hard to diagnose which part of the query the warning is coming from.

Is it common to merge on floating-point columns? That sounds inherently dangerous to me, but I suppose it could be on an integer column that was accidentally or necessarily up-casted.

Equality joins on float columns are very uncommon and most often happen accidentally. Happy to move that part of the discussion over to dask-sql to improve dtype checking during joins and raising more warnings. This issue can primarily focus on handling integer dtypes.

github-actions[bot] commented 2 years ago

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.