activeviam / par-student-spark-atoti

Project with students from CentraleSupelec to explore Spark API in order to power atoti with Spark
1 stars 0 forks source link

Physical Plans (Java operators VS Scala operators VS raw SQL queries) #15

Open arnaudframmery opened 2 years ago

arnaudframmery commented 2 years ago

We can look at the physical plans generated by the tranformations that are applied on our dataframes and see that the result is quite different depending on the operator that we are using. This can be an explanation about the performance gap that we can notice during the benchmarks.

These query plans are from a filter performing the operation Severity = 4

With Java Operators

(1) Scan csv 
Output [47]: [ID#16, Severity#17, ...]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]

(2) Filter [codegen id : 1]
Input [47]: [ID#16, Severity#17, ...]
Condition : io.atoti.spark.condition.EqualCondition$$Lambda$2640/0x000000080183e7c8@530d62a0.call

=> We can see that the condition is not clearly understand by Spark, and so no optimization is available

With Scala Operators

(1) Scan csv 
Output [47]: [ID#16, Severity#17, ...]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]
PushedFilters: [IsNotNull(Severity), EqualTo(Severity,4)]

(2) Filter [codegen id : 1]
Input [47]: [ID#16, Severity#17, ...]
Condition : (isnotnull(Severity#17) AND (Severity#17 = 4))

=> We can notice that the condition is clearly understand by Spark

With a raw SQL query

(1) Scan csv 
Output [47]: [ID#16, Severity#17, ...]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]
PushedFilters: [IsNotNull(Severity), EqualTo(Severity,4)]

(2) Filter [codegen id : 1]
Input [47]: [ID#16, Severity#17, ...]
Condition : (isnotnull(Severity#17) AND (Severity#17 = 4)

=> We have the exact same result than with the scala operators

arnaudframmery commented 2 years ago

These query plans are from a group by where we count the number of lines for each level of Severity

With Java Operators

(1) Scan csv 
Output [47]: [ID#16, Severity#17, ...]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]

(2) Filter
Input [47]: [ID#16, Severity#17, ...]
Condition : io.atoti.spark.condition.TrueCondition$$Lambda$2647/0x0000000801842648@33d2737c.call

(3) Project
Output [1]: [Severity#17]
Input [47]: [ID#16, Severity#17, ...]

(4) HashAggregate
Input [1]: [Severity#17]
Keys [1]: [Severity#17]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#163L]
Results [2]: [Severity#17, count#164L]

(5) Exchange
Input [2]: [Severity#17, count#164L]
Arguments: hashpartitioning(Severity#17, 200), ENSURE_REQUIREMENTS, [id=#41]

(6) HashAggregate
Input [2]: [Severity#17, count#164L]
Keys [1]: [Severity#17]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#110L]
Results [2]: [Severity#17, count(1)#110L AS severity_count#111L]

(7) AdaptiveSparkPlan
Output [2]: [Severity#17, severity_count#111L]
Arguments: isFinalPlan=false

=> The presence of a condition is because of the implementation of our API AggregateQuery() that can handle a condition. By default if no condition is given by the user, a True condition is generated. We can see here that this True condition generate extra steps.

With Scala Operators

(1) Scan csv 
Output [1]: [Severity#17]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]
ReadSchema: struct<Severity:int>

(2) HashAggregate
Input [1]: [Severity#17]
Keys [1]: [Severity#17]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#163L]
Results [2]: [Severity#17, count#164L]

(3) Exchange
Input [2]: [Severity#17, count#164L]
Arguments: hashpartitioning(Severity#17, 200), ENSURE_REQUIREMENTS, [id=#33]

(4) HashAggregate
Input [2]: [Severity#17, count#164L]
Keys [1]: [Severity#17]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#110L]
Results [2]: [Severity#17, count(1)#110L AS severity_count#111L]

(5) AdaptiveSparkPlan
Output [2]: [Severity#17, severity_count#111L]
Arguments: isFinalPlan=false

=> We can see that Spark had been able to optimize the query by removing the filter that is useless

With a raw SQL query

(1) Scan csv 
Output [1]: [Severity#17]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]
ReadSchema: struct<Severity:int>

(2) HashAggregate
Input [1]: [Severity#17]
Keys [1]: [Severity#17]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#114L]
Results [2]: [Severity#17, count#115L]

(3) Exchange
Input [2]: [Severity#17, count#115L]
Arguments: hashpartitioning(Severity#17, 200), ENSURE_REQUIREMENTS, [id=#33]

(4) HashAggregate
Input [2]: [Severity#17, count#115L]
Keys [1]: [Severity#17]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#111L]
Results [2]: [Severity#17, count(1)#111L AS severity_count#110L]

(5) AdaptiveSparkPlan
Output [2]: [Severity#17, severity_count#110L]
Arguments: isFinalPlan=false

=> We can notice that it is exactly the same physical plan that the one with Scala operators