apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.14k stars 416 forks source link

[VL] Output metrics are counted multiple times for InputIterator of broadcasted exchange #7079

Open ivoson opened 1 week ago

ivoson commented 1 week ago

Backend

VL (Velox)

Bug description

Below screenshot shows the metrics from tpcds q6. We can see that the output rows was counted multiple times(same as the task number of the join stage). From task execution's perspective, the task indeed read the data multiple times. While from a sql plan node's perspective, this is confusing since the output row counts of the plan node is not right.

I am wondering that does this make sense and shall we count only once for inputIterator of broadcasted exchange.

image

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

zhztheplayer commented 1 week ago

@ivoson Would you like to help check how vanilla Spark handles this case? Thanks.

ivoson commented 1 week ago

@ivoson Would you like to help check how vanilla Spark handles this case? Thanks.

Sure, checking on this.

In vanilla Spark, the join node load the broadcasted hash table directly without a wrapper node of broadcasted exchange. So there is no such problem. For native Spark, the InputIterator works as a wrapper of parent node. I think it might be more reasonable that their metrics are also consistent.

@zhztheplayer Please share your thoughts on this. Thanks.

image

ivoson commented 1 week ago

cc @ulysses-you I am wondering do you have any thoughts about this? Thanks.

ulysses-you commented 1 week ago

For now the output row number is broadcasted rows * numPartitions. I agree it may make user who know Spark a lot a bit confused. I'm fine to use the original broadcasted rows if there is a easy way to resolve it.

ivoson commented 1 week ago

For now the output row number is broadcasted rows * numPartitions. I agree it may make user who know Spark a lot a bit confused. I'm fine to use the original broadcasted rows if there is a easy way to resolve it.

I am thinking that maybe we can only collect the metrics from one of the tasks(task 0) for a wrapper of broadcast exchange which is easy to implement but a little bit tricky... Do you think the proposal make sense? cc @zhztheplayer @ulysses-you If it sounds good to you, I'll create a PR for this.