Open revans2 opened 1 year ago
It should be noted that something similar to this already exists in the tools repo.
https://github.com/NVIDIA/spark-rapids-tools/tree/dev/data_validation
I'm not 100% sure if we want to have two tools or just one. But the code is not that complicated. The data_validation tool is not great. It requires a primary key, which should not be a requirement. It does not handle maps, nulls, duplicate rows, or floating point approximate matches correctly.
The above example can be updated to handle nulls by using the null equals operator '<=>' as the join condition for the anti-join columns. Maps can be replaced with a map_entries
that are array_sorted. And for now we are just going to punt on floats/doubles in both cases and not include them in our scale testing suite until this can support doing a fuzzy check for them.
@revans2 I'm building pipelines to run preliminary scale test on our internal cluster and I got some numbers for this issue.
with scale_factor=100, complexity=300
, I got the data at size
134.1 G 268.1 G /data/scale-test/SCALE_100_300_parquet_1.0.0_41
GPU output size (0 means failed queries)
1.2 G 2.4 G /data/scale-test/output/q10_1
46.2 G 92.4 G /data/scale-test/output/q11_1
0 0 /data/scale-test/output/q12_1
46.2 G 92.4 G /data/scale-test/output/q13_1
17.0 G 34.1 G /data/scale-test/output/q14_1
12.9 G 25.7 G /data/scale-test/output/q15_1
39.3 K 78.7 K /data/scale-test/output/q16_1
9.8 G 19.7 G /data/scale-test/output/q17_1
5.8 G 11.6 G /data/scale-test/output/q18_1
38.5 K 77.1 K /data/scale-test/output/q19_1
645.0 G 1.3 T /data/scale-test/output/q1_1
38.5 K 77.1 K /data/scale-test/output/q20_1
0 0 /data/scale-test/output/q21_1
11.1 G 22.2 G /data/scale-test/output/q22_1
19.5 K 39.1 K /data/scale-test/output/q23_1
16.5 M 33.1 M /data/scale-test/output/q24_1
290.7 K 581.3 K /data/scale-test/output/q25_1
775.5 M 1.5 G /data/scale-test/output/q26_1
1.7 G 3.5 G /data/scale-test/output/q27_1
1004.1 M 2.0 G /data/scale-test/output/q28_1
1.2 M 2.4 M /data/scale-test/output/q29_1
644.9 G 1.3 T /data/scale-test/output/q2_1
1.4 G 2.9 G /data/scale-test/output/q30_1
603.6 M 1.2 G /data/scale-test/output/q31_1
841.9 K 1.6 M /data/scale-test/output/q32_1
174.3 M 348.6 M /data/scale-test/output/q33_1
191.7 M 383.3 M /data/scale-test/output/q34_1
70.6 M 141.3 M /data/scale-test/output/q35_1
26.8 M 53.5 M /data/scale-test/output/q36_1
27.1 G 54.1 G /data/scale-test/output/q37_1
26.5 G 53.1 G /data/scale-test/output/q38_1
4.0 G 7.9 G /data/scale-test/output/q39_1
644.9 G 1.3 T /data/scale-test/output/q3_1
0 0 /data/scale-test/output/q40_1
15.7 M 31.4 M /data/scale-test/output/q41_1
12.1 G 24.2 G /data/scale-test/output/q4_1
12.1 G 24.2 G /data/scale-test/output/q5_1
1.2 G 2.4 G /data/scale-test/output/q6_1
1.2 G 2.4 G /data/scale-test/output/q7_1
1.2 G 2.4 G /data/scale-test/output/q8_1
1.2 G 2.4 G /data/scale-test/output/q9_1
Lots of the output are in large size which I doubt the validation will take a very long time. Should we perform like what NDS queries do? they use LIMIT
for all the queries that the output are quite small.
Limit has problems, even if there is an order by before it there can be ambiguity in the order of the results. I would much rather do something like the following.
import org.apache.spark.sql.DataFrame
def compare(left: DataFrame, right: DataFrame): DataFrame = {
val leftCount = left.groupBy(left.columns.map(col(_)): _*).count
val rightCount = right.groupBy(right.columns.map(col(_)): _*).count
val joinOn = leftCount.columns.map(c => leftCount(c) <=> rightCount(c)).reduceLeft(_ and _)
val onlyRight = rightCount.join(leftCount, joinOn, joinType="left_anti").withColumn("_in_column", lit("right"))
val onlyLeft = leftCount.join(rightCount, joinOn, joinType="left_anti").withColumn("_in_column", lit("left"))
onlyRight.union(onlyLeft)
}
This should run a query that compares the two dataframes. There is a lot we can do to improve it. It does not support Maps, and floating-point is not going to be compared very well, but that should be good enough for the current scale tests and should be a good starting point.
It would also be nice to explicitly verify that the schemas match before running the query and if we see a map to throw an exception, or if you see floating point to output a warning.
Is your feature request related to a problem? Please describe. https://github.com/NVIDIA/spark-rapids/pull/8706 adds a tool to generate large data sets for scale testing, and possibly some performance testing for very difficult corner cases. But ultimately we need to also verify that the results after running that generated data through a query are correct. TPC- benchmarks provide a validation tool that can be used to check if the results are correct, but for us, our definition of correct is that it matches Spark on the CPU. Warts and all.
The other problem is that the data produced by scale testing can be very very large (both in terms of rows, but also column count, and nesting). So we need to come up with a way to validate that the results are correct in a way that is going to scale. This itself may be a really massive scale test too because we are likely to double the number of rows and columns by trying to compare two large data sets.
One way to do this is to group by all of the columns, do a count on them (so that all duplicates are removed), for both the expected data set and the test data set. Then to do two anti-joins on them. A left-anti-join to know if there are rows on the LHS that are not in the right, and a right-anti-join to know if there are rows in the RHS that are not in the left.
This works if all of the values produced will be bit for bit identical to those on the CPU, and are types that Spark can join on. For floating point types, especially with aggregations that is not guaranteed. Perhaps that is good enough. We just avoid floats in our scale tests and we are good, but I am not sure we can do that especially if we want to reproduce issues from customer queries.
We might need to normalize maps to make this work and turn them into a combinations of structs and arrays with the data sorted by the key.
For floating point values or anywhere that we would want to do an approximate match, I just don't know of a good way to do it that is distributed and scales well.
If we try to normalize the floating point values by rounding of any kind there are inflection points where if we are off by a little the rounding would just exacerbate the difference. We could provide a UDF to do a fuzzy match on the floating point values as a part of the anti-join, but we could not do the group by any more to deal with duplicates because we might not even be self consistent, and aggregations don't support a fuzzy match.
The only option I can think of is to sort all the data from both data sets and produce a row number. This takes a bit of work with monotonically_increasing_id and partition_id, along with writing out the intermediate data, but it is possible. After that we can join on the row numbers and do a fuzzy match in a UDF for each column that contains floating point values, or maps. Not ideal but it would work.