Skip to content

Commit 5f4b1f0

Browse files
committed
[KYUUBI #7139] Fix Spark extension rules to support RebalancePartitions
### Why are the changes needed? As title. ### How was this patch tested? UT are modified. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7139 from pan3793/rebalance. Closes #7139 edb070a [Cheng Pan] fix 4d3984a [Cheng Pan] Fix Spark extension rules to support RebalancePartitions Authored-by: Cheng Pan <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
1 parent 47063d9 commit 5f4b1f0

File tree

11 files changed

+59
-42
lines changed

11 files changed

+59
-42
lines changed

extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends RepartitionBuilder {
4545
}
4646
}
4747
}
48-
49-
override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
50-
super.canInsertRepartitionByExpression(plan) && {
51-
plan match {
52-
case _: RebalancePartitions => false
53-
case _ => true
54-
}
55-
}
56-
}
5748
}
5849

5950
/**

extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] {
126126
case _: Window => true
127127
case s: Sort if s.global => true
128128
case _: RepartitionOperation => true
129+
case _: RebalancePartitions => true
129130
case _: GlobalLimit => true
130131
case _ => false
131132
}.isDefined
@@ -139,8 +140,8 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] {
139140
case SubqueryAlias(_, child) => canInsert(child)
140141
case Limit(_, _) => false
141142
case _: Sort => false
142-
case _: RepartitionByExpression => false
143-
case _: Repartition => false
143+
case _: RepartitionOperation => false
144+
case _: RebalancePartitions => false
144145
case _ => true
145146
}
146147

extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
5050

5151
def canInsertZorder(query: LogicalPlan): Boolean = query match {
5252
case Project(_, child) => canInsertZorder(child)
53-
case _: RepartitionByExpression | _: Repartition
53+
case _: RepartitionOperation | _: RebalancePartitions
5454
if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
5555
// TODO: actually, we can force zorder even if existed some shuffle
5656
case _: Sort => false
57-
case _: RepartitionByExpression => false
58-
case _: Repartition => false
57+
case _: RepartitionOperation => false
58+
case _: RebalancePartitions => false
5959
case _ => true
6060
}
6161

extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,21 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
2828
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
2929

3030
test("check rebalance exists") {
31-
def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
31+
def check(
32+
df: => DataFrame,
33+
expectedRebalanceNumEnabled: Int = 1,
34+
expectedRebalanceNumDisabled: Int = 0): Unit = {
3235
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
3336
assert(
3437
df.queryExecution.analyzed.collect {
3538
case r: RebalancePartitions => r
36-
}.size == expectedRebalanceNum)
39+
}.size == expectedRebalanceNumEnabled)
3740
}
3841
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "false") {
3942
assert(
4043
df.queryExecution.analyzed.collect {
4144
case r: RebalancePartitions => r
42-
}.isEmpty)
45+
}.size == expectedRebalanceNumDisabled)
4346
}
4447
}
4548

@@ -69,6 +72,14 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
6972
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)"))
7073
}
7174

75+
withTable("tmp1") {
76+
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
77+
check(
78+
sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM VALUES(1),(2),(3) AS t(c1)"),
79+
1,
80+
1)
81+
}
82+
7283
withTable("tmp1", "tmp2") {
7384
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
7485
sql(s"CREATE TABLE tmp2 (c1 int) $storage")

extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends RepartitionBuilder {
4545
}
4646
}
4747
}
48-
49-
override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
50-
super.canInsertRepartitionByExpression(plan) && {
51-
plan match {
52-
case _: RebalancePartitions => false
53-
case _ => true
54-
}
55-
}
56-
}
5748
}
5849

5950
/**

extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] {
108108
case _: Window => true
109109
case s: Sort if s.global => true
110110
case _: RepartitionOperation => true
111+
case _: RebalancePartitions => true
111112
case _: GlobalLimit => true
112113
case _ => false
113114
}.isDefined
@@ -121,8 +122,8 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] {
121122
case SubqueryAlias(_, child) => canInsert(child)
122123
case Limit(_, _) => false
123124
case _: Sort => false
124-
case _: RepartitionByExpression => false
125-
case _: Repartition => false
125+
case _: RepartitionOperation => false
126+
case _: RebalancePartitions => false
126127
case _ => true
127128
}
128129

extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
4949

5050
def canInsertZorder(query: LogicalPlan): Boolean = query match {
5151
case Project(_, child) => canInsertZorder(child)
52-
case _: RepartitionByExpression | _: Repartition
52+
case _: RepartitionOperation | _: RebalancePartitions
5353
if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
5454
// TODO: actually, we can force zorder even if existed some shuffle
5555
case _: Sort => false
56-
case _: RepartitionByExpression => false
57-
case _: Repartition => false
56+
case _: RepartitionOperation => false
57+
case _: RebalancePartitions => false
5858
case _ => true
5959
}
6060

extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
2929
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
3030

3131
test("check rebalance exists") {
32-
def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
32+
def check(
33+
df: => DataFrame,
34+
expectedRebalanceNumEnabled: Int = 1,
35+
expectedRebalanceNumDisabled: Int = 0): Unit = {
3336
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
3437
withListener(df) { write =>
3538
assert(write.collect {
3639
case r: RebalancePartitions => r
37-
}.size == expectedRebalanceNum)
40+
}.size == expectedRebalanceNumEnabled)
3841
}
3942
}
4043
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "false") {
4144
withListener(df) { write =>
4245
assert(write.collect {
4346
case r: RebalancePartitions => r
44-
}.isEmpty)
47+
}.size == expectedRebalanceNumDisabled)
4548
}
4649
}
4750
}
@@ -72,6 +75,14 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
7275
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)"))
7376
}
7477

78+
withTable("tmp1") {
79+
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
80+
check(
81+
sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM VALUES(1),(2),(3) AS t(c1)"),
82+
1,
83+
1)
84+
}
85+
7586
withTable("tmp1", "tmp2") {
7687
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
7788
sql(s"CREATE TABLE tmp2 (c1 int) $storage")

extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ trait RebalanceBeforeWritingBase extends Rule[LogicalPlan] {
3939
case _: Window => true
4040
case s: Sort if s.global => true
4141
case _: RepartitionOperation => true
42+
case _: RebalancePartitions => true
4243
case _: GlobalLimit => true
4344
case _ => false
4445
}
@@ -53,8 +54,7 @@ trait RebalanceBeforeWritingBase extends Rule[LogicalPlan] {
5354
case SubqueryAlias(_, child) => canInsert(child)
5455
case Limit(_, _) => false
5556
case _: Sort => false
56-
case _: RepartitionByExpression => false
57-
case _: Repartition => false
57+
case _: RepartitionOperation => false
5858
case _: RebalancePartitions => false
5959
case _ => true
6060
}

extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ trait InsertZorderBeforeWritingBase extends Rule[LogicalPlan] {
5757

5858
def canInsertZorder(query: LogicalPlan): Boolean = query match {
5959
case Project(_, child) => canInsertZorder(child)
60-
case _: RepartitionByExpression | _: Repartition
60+
case _: RepartitionOperation | _: RebalancePartitions
6161
if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
6262
// TODO: actually, we can force zorder even if existed some shuffle
6363
case _: Sort => false
64-
case _: RepartitionByExpression => false
65-
case _: Repartition => false
64+
case _: RepartitionOperation => false
65+
case _: RebalancePartitions => false
6666
case _ => true
6767
}
6868

0 commit comments

Comments
 (0)