palantir / spark

Palantir Distribution of Apache Spark
Apache License 2.0
67 stars 51 forks source link

[SPARK-32753][SQL] Only copy tags to node with no tags #732

Closed rshkv closed 3 years ago

rshkv commented 3 years ago

What changes were proposed in this pull request?

This is a cherry-pick of SPARK-32753 / https://github.com/apache/spark/pull/29593.

It fixes a correctness bug that causes duplicate rows or empty rows (as we've seen internally). It reproduces on apache/spark 3.0.1 when aggregating on a column, then repartitioning on the same. E.g. the query below. More context on the internal ticket. (There is no upstream release of this yet. The fix merged into branch-3.0 on Sep 8. Release 3.0.1 was cut Aug 28.).

scala> df.show()
+----+-----+                                                                    
| tag| data|
+----+-----+
|tag1|data1|
|tag2|data2|
+----+-----+

scala> val repartitioned = df
    .groupBy("tag")
    .agg(collect_set("data"))
    .repartition(col("tag"))

The written data is:

scala> repartitioned.write.json("json_out")

scala> spark.read.json("json_out").show()
+-----------------+----+
|collect_set(data)| tag|
+-----------------+----+
|               []|tag2|
|               []|tag1|
+-----------------+----+

Reason for the bug is here.

How was this patch tested?

Upstream introduced a unit test against the shape of the adaptive plan. I also tested my repro against this which otherwise causes empty rows (upstream only reported duplicate rows - seems we're lucky in that the fix covers us as well).

rshkv commented 3 years ago

Yeah, that's how I understand as well. The TreeNodes are replaced when transformed by rules, either by a copy (with rules applied to children) or by a different node. Sometimes that replacement is effectively a removal where you return the child node.

The tag copying only happens from this node that the rule is being applied to the replacement node. So previously, as you say, if you had a child with tags, you copied your own tags over the child's. I suppose AQE is one of these removal situations.

From looking at the values in the debugger, the tags seem to contain logical plan information. So a removed node would pass on its logical plan information, incl. attribute references, to one that shouldn't have it.