NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
787 stars 228 forks source link

[BUG] natural join on string key results in a data frame with spurious NULLs #1885

Closed willb closed 3 years ago

willb commented 3 years ago

Describe the bug

I have a data frame SRC that is backed by a Parquet file, and two data frames AGG1 and AGG2 that have the same set of values for a key column but different columns for the remaining columns (these column values come from performing different aggregates on SRC). When I perform a natural join on AGG1 and AGG2, generating a new data frame JOINED, the result appears as I would expect it to: one record for each key value, with the union of the columns of AGG1 and AGG2. Furthermore, filtering JOINED with isnull or isnan on each column reveals no null or NaN values in any column. However, when running on GPU (but not CPU):

  1. JOINED.dropna().count() is approximately 0.05% of JOINED.count()
  2. using JOINED on the left side of another join results in NULLs for the columns in JOINED in the output frame

Relevant query plans for the input relations are here and an example REPL session exhibiting this behavior is here.

Steps/Code to reproduce bug Please provide a list of steps or a code sample to reproduce the issue. Avoid posting private or sensitive data.

  1. run spark-submit --driver-memory 16G generate.py from this repo
  2. submit this reproducer script to Spark with and without the GPU plugin loaded (or run the linked notebook).

Expected behavior

The output should show that customer_charges had no records with NULL values whether running on CPU or GPU.

Environment details (please complete the following information)

This requires no special Spark configuration. I've reproduced it with a local master running under Docker with at least the following configurations:

Additional context Add any other context about the problem here.

jlowe commented 3 years ago

Thanks for the report and repro case, @willb!

I was able to reproduce the issue. I haven't root-caused it yet, but I did find it interesting that removing the customers.cache() line allows the repro case to produce the same output as the CPU. So the problem may be related to how data is being loaded from cached dataframes.

jlowe commented 3 years ago

I believe it is indeed keyed on the fact that cache() is being used in the plan. Here's what I think is happening:

In the plan, it ends up creating a cache of a dataframe that is hashpartition(customerID, 200). Later when it builds the customer_charges dataframe by joining on customerID, it normally would need to shuffle both sides of the join on the customerID key. However in this case the cached data is already partitioned that way, so it avoids performing the shuffle for that side of the join input.

The problem is that the plan that created the data that was cached was executed completely on the GPU. Therefore it was partitioned on the GPU. The GPU and CPU currently do not hash partition keys the same way, so when it loads a GPU-hashed partition and tries to join it with a CPU-hashed partition, most of the keys don't line up. That explains why we don't see any missing data feeding into the join yet so few rows survive it. The hash-join was messed up due to mixing of incompatible partitioning schemes.

This explains why I can get the correct results by removing the cache() directive, which allows the plugin to see the entire plan back to the file reads and avoid mixing incompatible CPU/GPU join inputs. The cached input essentially blinds the logic for detecting mismatched joins since it doesn't see a shuffle on both sides of the join to compare. Another way to show that this is a hash join issue is by forcing the shuffle partition count to 1 so only one task does the join. That also fixes the problem even with the cache() directive, since there's only 1 partition holding all the keys. As soon as the shuffle partition count goes to 2 or higher, the join results start dropping proportionally.

revans2 commented 3 years ago

I have a patch that lets this pass, but I am trying to clean up the code and add in more tests. Hopefully I will have a patch posted this afternoon.

revans2 commented 3 years ago

The old patched worked great, except for cases with AQE enabled. In those cases AQE had inserted operations into the plan that changed what the reported partitioning was. This caused us to insert new shuffles to to deal with the different partitioning, which then could go totally counter to what AQE actually wanted, and end up producing a plan that wold not work. Ironically with AQE on we don't have enough information to actually insert in the shuffles correctly. That info might be available, but it is burred really deep and after talking with @jlowe and @andygrove I think we are going to try a different approach and hopefully use the Spark compatible murmur3 work in cudf to just make all of our shuffles compatible with Sparks.

revans2 commented 3 years ago

@willb #1910 has been merged in I am going to close this because from our testing this should be fixed. If you run into the issue again in 0.4 or above please reopen this or file a new one.