Skip to content

Commit 1475ade

Browse files
committed
avoid canonicalization if possible
1 parent efd6045 commit 1475ade

File tree

1 file changed

+18
-4
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/exchange

1 file changed

+18
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,27 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
107107
if (!conf.exchangeReuseEnabled) {
108108
return plan
109109
}
110-
val exchanges = mutable.HashMap[SparkPlan, Exchange]()
110+
// To avoid costly canonicalization of an exchange:
111+
// - we use its schema first to check to check if it can be replaced to a reused exchange at all
112+
// - we insert an exchange into the map of canonicalized plans only when at least 2 exchange
113+
// have the same schema
114+
val exchanges = mutable.Map[StructType, (Exchange, mutable.Map[SparkPlan, Exchange])]()
111115

112116
def reuse(plan: SparkPlan): SparkPlan = plan.transform {
113117
case exchange: Exchange =>
114-
val newExchange = exchanges.getOrElseUpdate(exchange.canonicalized, exchange)
115-
if (newExchange.ne(exchange)) {
116-
ReusedExchangeExec(exchange.output, newExchange)
118+
val (firstSameSchemaExchange, sameResultExchanges) =
119+
exchanges.getOrElseUpdate(exchange.schema, (exchange, mutable.Map()))
120+
if (firstSameSchemaExchange.ne(exchange)) {
121+
if (sameResultExchanges.isEmpty) {
122+
sameResultExchanges += firstSameSchemaExchange.canonicalized -> firstSameSchemaExchange
123+
}
124+
val sameResultExchange =
125+
sameResultExchanges.getOrElseUpdate(exchange.canonicalized, exchange)
126+
if (sameResultExchange.ne(exchange)) {
127+
ReusedExchangeExec(exchange.output, sameResultExchange)
128+
} else {
129+
exchange
130+
}
117131
} else {
118132
exchange
119133
}

0 commit comments

Comments
 (0)