Skip to content

Commit 3f9e240

Browse files
committed
[SPARK-29375][SQL] Exchange reuse across all subquery levels
1 parent 50f6d93 commit 3f9e240

File tree

5 files changed

+97
-35
lines changed

5 files changed

+97
-35
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ import org.apache.spark.util.Utils
4747
class QueryExecution(
4848
val sparkSession: SparkSession,
4949
val logical: LogicalPlan,
50-
val tracker: QueryPlanningTracker = new QueryPlanningTracker) {
50+
val tracker: QueryPlanningTracker = new QueryPlanningTracker,
51+
val subQuery: Boolean = false) {
5152

5253
// TODO: Move the planner an optimizer into here from SessionState.
5354
protected def planner = sparkSession.sessionState.planner
@@ -127,9 +128,9 @@ class QueryExecution(
127128
EnsureRequirements(sparkSession.sessionState.conf),
128129
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
129130
sparkSession.sessionState.columnarRules),
130-
CollapseCodegenStages(sparkSession.sessionState.conf),
131-
ReuseExchange(sparkSession.sessionState.conf),
132-
ReuseSubquery(sparkSession.sessionState.conf))
131+
CollapseCodegenStages(sparkSession.sessionState.conf)) ++
132+
(if (subQuery) Nil else Seq(ReuseExchange(sparkSession.sessionState.conf))) :+
133+
ReuseSubquery(sparkSession.sessionState.conf)
133134

134135
def simpleString: String = simpleString(false)
135136

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.execution.exchange
1919

2020
import scala.collection.mutable
21-
import scala.collection.mutable.ArrayBuffer
2221

2322
import org.apache.spark.broadcast
2423
import org.apache.spark.rdd.RDD
@@ -107,35 +106,39 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
107106
if (!conf.exchangeReuseEnabled) {
108107
return plan
109108
}
110-
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
111-
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
112-
113-
// Replace a Exchange duplicate with a ReusedExchange
114-
def reuse: PartialFunction[Exchange, SparkPlan] = {
115-
case exchange: Exchange =>
116-
val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
117-
val samePlan = sameSchema.find { e =>
118-
exchange.sameResult(e)
119-
}
120-
if (samePlan.isDefined) {
121-
// Keep the output of this exchange, the following plans require that to resolve
122-
// attributes.
123-
ReusedExchangeExec(exchange.output, samePlan.get)
124-
} else {
125-
sameSchema += exchange
126-
exchange
109+
// To avoid costly canonicalization of an exchange:
110+
// - we use its schema first to check if it can be replaced to a reused exchange at all
111+
// - we insert an exchange into the map of canonicalized plans only when at least 2 exchange
112+
// have the same schema
113+
val exchanges = mutable.Map[StructType, (Exchange, mutable.Map[SparkPlan, Exchange])]()
114+
115+
def reuse(plan: SparkPlan): SparkPlan = {
116+
plan.transformUp {
117+
case exchange: Exchange =>
118+
val (firstSameSchemaExchange, sameResultExchanges) =
119+
exchanges.getOrElseUpdate(exchange.schema, (exchange, mutable.Map()))
120+
if (firstSameSchemaExchange.ne(exchange)) {
121+
if (sameResultExchanges.isEmpty) {
122+
sameResultExchanges +=
123+
firstSameSchemaExchange.canonicalized -> firstSameSchemaExchange
124+
}
125+
val sameResultExchange =
126+
sameResultExchanges.getOrElseUpdate(exchange.canonicalized, exchange)
127+
if (sameResultExchange.ne(exchange)) {
128+
ReusedExchangeExec(exchange.output, sameResultExchange)
129+
} else {
130+
exchange
131+
}
132+
} else {
133+
exchange
134+
}
135+
case other => other.transformExpressions {
136+
case sub: ExecSubqueryExpression =>
137+
sub.withNewPlan(reuse(sub.plan).asInstanceOf[BaseSubqueryExec])
127138
}
139+
}
128140
}
129141

130-
plan transformUp {
131-
case exchange: Exchange => reuse(exchange)
132-
} transformAllExpressions {
133-
// Lookup inside subqueries for duplicate exchanges
134-
case in: InSubqueryExec =>
135-
val newIn = in.plan.transformUp {
136-
case exchange: Exchange => reuse(exchange)
137-
}
138-
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
139-
}
142+
reuse(plan)
140143
}
141144
}

sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
178178
def apply(plan: SparkPlan): SparkPlan = {
179179
plan.transformAllExpressions {
180180
case subquery: expressions.ScalarSubquery =>
181-
val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
181+
val executedPlan =
182+
new QueryExecution(sparkSession, subquery.plan, subQuery = true).executedPlan
182183
ScalarSubquery(
183184
SubqueryExec(s"scalar-subquery#${subquery.exprId.id}", executedPlan),
184185
subquery.exprId)
@@ -192,7 +193,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
192193
}
193194
)
194195
}
195-
val executedPlan = new QueryExecution(sparkSession, query).executedPlan
196+
val executedPlan = new QueryExecution(sparkSession, query, subQuery = true).executedPlan
196197
InSubqueryExec(expr, SubqueryExec(s"subquery#${exprId.id}", executedPlan), exprId)
197198
}
198199
}

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
2323
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
2424
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
2525
import org.apache.spark.sql.execution.datasources.FileScanRDD
26+
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
2627
import org.apache.spark.sql.internal.SQLConf
2728
import org.apache.spark.sql.test.SharedSparkSession
2829

@@ -1389,6 +1390,62 @@ class SubquerySuite extends QueryTest with SharedSparkSession {
13891390
}
13901391
}
13911392

1393+
test("Exchange reuse across all subquery levels") {
1394+
Seq(true, false).foreach { reuse =>
1395+
withSQLConf(SQLConf.EXCHANGE_REUSE_ENABLED.key -> reuse.toString) {
1396+
val df = sql(
1397+
"""
1398+
|SELECT
1399+
| (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key),
1400+
| a.key
1401+
|FROM testData AS a
1402+
|JOIN testData AS b ON b.key = a.key
1403+
""".stripMargin)
1404+
1405+
val plan = df.queryExecution.executedPlan
1406+
1407+
val exchangeIds = plan.collectInPlanAndSubqueries { case e: Exchange => e.id }
1408+
val reusedExchangeIds = plan.collectInPlanAndSubqueries {
1409+
case re: ReusedExchangeExec => re.child.id
1410+
}
1411+
1412+
if (reuse) {
1413+
assert(exchangeIds.size == 2, "Exchange reusing not working correctly")
1414+
assert(reusedExchangeIds.size == 3, "Exchange reusing not working correctly")
1415+
assert(reusedExchangeIds.forall(exchangeIds.contains(_)),
1416+
"ReusedExchangeExec should reuse an existing exchange")
1417+
} else {
1418+
assert(exchangeIds.size == 5, "expect 5 Exchange when not reusing")
1419+
assert(reusedExchangeIds.size == 0, "expect 0 ReusedExchangeExec when not reusing")
1420+
}
1421+
1422+
val df2 = sql(
1423+
"""
1424+
SELECT
1425+
(SELECT min(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key),
1426+
(SELECT max(a.key) FROM testData AS a JOIN testData2 AS b ON b.a = a.key)
1427+
""".stripMargin)
1428+
1429+
val plan2 = df2.queryExecution.executedPlan
1430+
1431+
val exchangeIds2 = plan2.collectInPlanAndSubqueries { case e: Exchange => e.id }
1432+
val reusedExchangeIds2 = plan2.collectInPlanAndSubqueries {
1433+
case re: ReusedExchangeExec => re.child.id
1434+
}
1435+
1436+
if (reuse) {
1437+
assert(exchangeIds2.size == 4, "Exchange reusing not working correctly")
1438+
assert(reusedExchangeIds2.size == 2, "Exchange reusing not working correctly")
1439+
assert(reusedExchangeIds2.forall(exchangeIds2.contains(_)),
1440+
"ReusedExchangeExec should reuse an existing exchange")
1441+
} else {
1442+
assert(exchangeIds2.size == 6, "expect 6 Exchange when not reusing")
1443+
assert(reusedExchangeIds2.size == 0, "expect 0 ReusedExchangeExec when not reusing")
1444+
}
1445+
}
1446+
}
1447+
}
1448+
13921449
test("Scalar subquery name should start with scalar-subquery#") {
13931450
val df = sql("SELECT a FROM l WHERE a = (SELECT max(c) FROM r WHERE c = 1)".stripMargin)
13941451
var subqueryExecs: ArrayBuffer[SubqueryExec] = ArrayBuffer.empty

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ class PlannerSuite extends SharedSparkSession {
470470
Inner,
471471
None,
472472
shuffle,
473-
shuffle)
473+
shuffle.copy())
474474

475475
val outputPlan = ReuseExchange(spark.sessionState.conf).apply(inputPlan)
476476
if (outputPlan.collect { case e: ReusedExchangeExec => true }.size != 1) {

0 commit comments

Comments
 (0)