Skip to content

Commit ed26b33

Browse files
committed
fix canonicalization issues on executors (sqlContext is null)
1 parent ef3eaf0 commit ed26b33

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,13 @@ case class HashAggregateExec(
9393
// This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash
9494
// map and/or the sort-based aggregation once it has processed a given number of input rows.
9595
private val testFallbackStartsAt: Option[(Int, Int)] = {
96-
sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match {
97-
case null | "" => None
98-
case fallbackStartsAt =>
99-
val splits = fallbackStartsAt.split(",").map(_.trim)
100-
Some((splits.head.toInt, splits.last.toInt))
96+
Option(sqlContext).flatMap {
97+
_.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match {
98+
case null | "" => None
99+
case fallbackStartsAt =>
100+
val splits = fallbackStartsAt.split(",").map(_.trim)
101+
Some((splits.head.toInt, splits.last.toInt))
102+
}
101103
}
102104
}
103105

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ case class ShuffleExchangeExec(
6161

6262
override def nodeName: String = "Exchange"
6363

64-
private val serializer: Serializer =
64+
private lazy val serializer: Serializer =
6565
new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
6666

6767
@transient lazy val inputRDD: RDD[InternalRow] = child.execute()

0 commit comments

Comments
 (0)