17
17
18
18
package org .apache .spark .sql .execution .exchange
19
19
20
+ import java .util .Objects
21
+
20
22
import scala .collection .mutable
21
- import scala .collection .mutable .ArrayBuffer
22
23
23
24
import org .apache .spark .broadcast
24
25
import org .apache .spark .rdd .RDD
25
26
import org .apache .spark .sql .catalyst .InternalRow
26
27
import org .apache .spark .sql .catalyst .expressions .{Attribute , AttributeMap , Expression , SortOrder }
27
28
import org .apache .spark .sql .catalyst .plans .physical .Partitioning
28
29
import org .apache .spark .sql .catalyst .rules .Rule
30
+ import org .apache .spark .sql .catalyst .trees .TreeNodeRef
29
31
import org .apache .spark .sql .execution ._
30
32
import org .apache .spark .sql .internal .SQLConf
31
33
import org .apache .spark .sql .types .StructType
@@ -52,6 +54,13 @@ abstract class Exchange extends UnaryExecNode {
52
54
case class ReusedExchangeExec (override val output : Seq [Attribute ], child : Exchange )
53
55
extends LeafExecNode {
54
56
57
+ override def equals (that : Any ): Boolean = that match {
58
+ case ReusedExchangeExec (output, child) => this .child == output && this .child.eq(child)
59
+ case _ => false
60
+ }
61
+
62
+ override def hashCode : Int = Objects .hash(output, child)
63
+
55
64
override def supportsColumnar : Boolean = child.supportsColumnar
56
65
57
66
// Ignore this wrapper for canonicalizing.
@@ -113,27 +122,38 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
113
122
// have the same schema
114
123
val exchanges = mutable.Map [StructType , (Exchange , mutable.Map [SparkPlan , Exchange ])]()
115
124
116
- def reuse (plan : SparkPlan ): SparkPlan = plan.transform {
117
- case exchange : Exchange =>
118
- val (firstSameSchemaExchange, sameResultExchanges) =
119
- exchanges.getOrElseUpdate(exchange.schema, (exchange, mutable.Map ()))
120
- if (firstSameSchemaExchange.ne(exchange)) {
121
- if (sameResultExchanges.isEmpty) {
122
- sameResultExchanges += firstSameSchemaExchange.canonicalized -> firstSameSchemaExchange
123
- }
124
- val sameResultExchange =
125
- sameResultExchanges.getOrElseUpdate(exchange.canonicalized, exchange)
126
- if (sameResultExchange.ne(exchange)) {
127
- ReusedExchangeExec (exchange.output, sameResultExchange)
125
+ def reuse (plan : SparkPlan ): SparkPlan = {
126
+ // Track exchanges that are replaced to reused exchanges to be able to fix ReusedExchangeExec
127
+ // nodes referencing to them
128
+ val reuseExchanges = mutable.Map [TreeNodeRef , Exchange ]()
129
+
130
+ plan.transform {
131
+ case exchange : Exchange =>
132
+ val (firstSameSchemaExchange, sameResultExchanges) =
133
+ exchanges.getOrElseUpdate(exchange.schema, (exchange, mutable.Map ()))
134
+ if (firstSameSchemaExchange.ne(exchange)) {
135
+ if (sameResultExchanges.isEmpty) {
136
+ sameResultExchanges +=
137
+ firstSameSchemaExchange.canonicalized -> firstSameSchemaExchange
138
+ }
139
+ val sameResultExchange =
140
+ sameResultExchanges.getOrElseUpdate(exchange.canonicalized, exchange)
141
+ if (sameResultExchange.ne(exchange)) {
142
+ reuseExchanges += new TreeNodeRef (exchange) -> sameResultExchange
143
+ ReusedExchangeExec (exchange.output, sameResultExchange)
144
+ } else {
145
+ exchange
146
+ }
128
147
} else {
129
148
exchange
130
149
}
131
- } else {
132
- exchange
150
+ case reuseExchange @ ReusedExchangeExec (output, child) =>
151
+ reuseExchanges.get(new TreeNodeRef (child)).map(ReusedExchangeExec (output, _))
152
+ .getOrElse(reuseExchange)
153
+ case other => other.transformExpressions {
154
+ case sub : ExecSubqueryExpression =>
155
+ sub.withNewPlan(reuse(sub.plan).asInstanceOf [BaseSubqueryExec ])
133
156
}
134
- case other => other.transformExpressions {
135
- case sub : ExecSubqueryExpression =>
136
- sub.withNewPlan(reuse(sub.plan).asInstanceOf [BaseSubqueryExec ])
137
157
}
138
158
}
139
159
0 commit comments