Skip to content

Commit 4bb0e6f

Browse files
committed
Remove support of Spark 3.2
1 parent c04dc4f commit 4bb0e6f

File tree

30 files changed

+120
-317
lines changed

30 files changed

+120
-317
lines changed

.github/workflows/master.yml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,6 @@ jobs:
6060
spark-archive: '-Pscala-2.13'
6161
exclude-tags: ''
6262
comment: 'normal'
63-
- java: 8
64-
spark: '3.5'
65-
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
66-
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
67-
comment: 'verify-on-spark-3.2-binary'
6863
- java: 8
6964
spark: '3.5'
7065
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.3.3 -Dspark.archive.name=spark-3.3.3-bin-hadoop3.tgz -Pzookeeper-3.6'

docs/deployment/migration-guide.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919

2020
## Upgrading from Kyuubi 1.10 to 1.11
2121

22+
* Since Kyuubi 1.11, the support of Spark engine for Spark 3.2 is removed.
2223
* Since Kyuubi 1.11, the configuration `spark.sql.watchdog.forcedMaxOutputRows` provided by Kyuubi Spark extension is removed, consider using `kyuubi.operation.result.max.rows` instead. Note, the latter works without requirement of installing Kyuubi Spark extension.
23-
2424
* Since Kyuubi 1.11, if the engine is running in cluster mode, Kyuubi will respect the `kyuubi.session.engine.startup.waitCompletion` config to determine whether to wait for the engine completion or not. If the engine is running in client mode, Kyuubi will always wait for the engine completion. And for Spark engine, Kyuubi will append the `spark.yarn.submit.waitAppCompletion` and `spark.kubernetes.submission.waitAppCompletion` configs to the engine conf based on the value of `kyuubi.session.engine.startup.waitCompletion`.
25-
2625
* Since Kyuubi 1.11, the configuration `kyuubi.session.engine.spark.initialize.sql` set by the client (via session configuration) is now correctly applied to every session in shared engines (USER, GROUP, SERVER). Previously, only the value set on the server side was applied and only for the first session when the engine started. Now, session-level settings provided by each client are respected.
2726

2827
## Upgrading from Kyuubi 1.9 to 1.10

docs/quick_start/quick_start.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each component.
4343
**Kyuubi** Gateway \ |release| \ - Kyuubi Server
4444
Engine lib - Kyuubi Engine
4545
Beeline - Kyuubi Beeline
46-
**Spark** Engine 3.2 to 3.5, 4.0 A Spark distribution
46+
**Spark** Engine 3.3 to 3.5, 4.0 A Spark distribution
4747
**Flink** Engine 1.17 to 1.20 A Flink distribution
4848
**Trino** Engine N/A A Trino cluster allows to access via trino-client v411
4949
**Doris** Engine N/A A Doris cluster

extensions/spark/kyuubi-spark-authz/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ build/mvn clean package -DskipTests -pl :kyuubi-spark-authz_2.12 -am -Dspark.ver
3737
- [x] 3.5.x (default)
3838
- [x] 3.4.x
3939
- [x] 3.3.x
40-
- [x] 3.2.x
41-
- [x] 3.1.x
40+
- [ ] 3.2.x
41+
- [ ] 3.1.x
4242
- [ ] 3.0.x
4343
- [ ] 2.4.x and earlier
4444

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -224,24 +224,7 @@ object PrivilegesBuilder {
224224
}
225225
}
226226
spec.queries(plan).foreach { p =>
227-
if (p.resolved) {
228-
buildQuery(Project(p.output, p), inputObjs, spark = spark)
229-
} else {
230-
try {
231-
// For spark 3.1, Some command such as CreateTableASSelect, its query was unresolved,
232-
// Before this pr, we just ignore it, now we support this.
233-
val analyzed = spark.sessionState.analyzer.execute(p)
234-
buildQuery(Project(analyzed.output, analyzed), inputObjs, spark = spark)
235-
} catch {
236-
case e: Exception =>
237-
LOG.debug(
238-
s"""
239-
|Failed to analyze unresolved
240-
|$p
241-
|due to ${e.getMessage}""".stripMargin,
242-
e)
243-
}
244-
}
227+
buildQuery(Project(p.output, p), inputObjs, spark = spark)
245228
}
246229
spec.operationType
247230

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/Authorization.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.kyuubi.plugin.spark.authz.rule
1919

2020
import org.apache.spark.sql.SparkSession
21-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2222
import org.apache.spark.sql.catalyst.rules.Rule
2323
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
2424
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
@@ -52,12 +52,7 @@ object Authorization {
5252

5353
def markAuthChecked(plan: LogicalPlan): LogicalPlan = {
5454
plan.setTagValue(KYUUBI_AUTHZ_TAG, ())
55-
plan transformDown {
56-
// TODO: Add this line Support for spark3.1, we can remove this
57-
// after spark 3.2 since https://issues.apache.org/jira/browse/SPARK-34269
58-
case view: View =>
59-
markAllNodesAuthChecked(view.child)
60-
}
55+
plan
6156
}
6257

6358
protected def isAuthChecked(plan: LogicalPlan): Boolean = {

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/FilterDataSourceV2Strategy.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,11 @@
1717
package org.apache.kyuubi.plugin.spark.authz.rule.rowfilter
1818

1919
import org.apache.spark.sql.{SparkSession, Strategy}
20-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
20+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2121
import org.apache.spark.sql.execution.SparkPlan
2222

2323
case class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy {
2424
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
25-
// For Spark 3.1 and below, `ColumnPruning` rule will set `ObjectFilterPlaceHolder#child` to
26-
// `Project`
27-
case ObjectFilterPlaceHolder(Project(_, child)) if child.nodeName == "ShowNamespaces" =>
28-
spark.sessionState.planner.plan(child)
29-
.map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq
30-
31-
// For Spark 3.2 and above
3225
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces" =>
3326
spark.sessionState.planner.plan(child)
3427
.map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@ private[authz] object AuthZUtils {
8484
}
8585

8686
lazy val SPARK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION)
87-
lazy val isSparkV32OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.2"
88-
lazy val isSparkV33OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.3"
8987
lazy val isSparkV34OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.4"
9088
lazy val isSparkV35OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.5"
9189
lazy val isSparkV40OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "4.0"

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
245245
}
246246

247247
test("AlterTableRecoverPartitionsCommand") {
248-
// AlterTableRecoverPartitionsCommand exists in the version below 3.2
249-
assume(!isSparkV32OrGreater)
250248
val tableName = reusedDb + "." + "TableToMsck"
251249
withTable(tableName) { _ =>
252250
sql(
@@ -367,11 +365,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
367365
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
368366
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
369367
assertEqualsIgnoreCase(reusedPartTableShort)(po0.objectName)
370-
if (isSparkV32OrGreater) {
371-
// Query in AlterViewAsCommand can not be resolved before SPARK-34698
372-
assert(po0.columns === Seq("key", "pid", "value"))
373-
checkTableOwner(po0)
374-
}
368+
assert(po0.columns === Seq("key", "pid", "value"))
369+
checkTableOwner(po0)
375370
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
376371
assert(accessType0 === AccessType.SELECT)
377372

@@ -482,7 +477,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
482477
}
483478

484479
test("AnalyzeTablesCommand") {
485-
assume(isSparkV32OrGreater)
486480
val plan = sql(s"ANALYZE TABLES IN $reusedDb COMPUTE STATISTICS")
487481
.queryExecution.analyzed
488482
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
@@ -626,7 +620,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
626620
assert(po.actionType === PrivilegeObjectActionType.OTHER)
627621
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
628622
assert(po.catalog.isEmpty)
629-
val db = if (isSparkV33OrGreater) defaultDb else null
623+
val db = defaultDb
630624
assertEqualsIgnoreCase(db)(po.dbname)
631625
assertEqualsIgnoreCase("CreateFunctionCommand")(po.objectName)
632626
assert(po.columns.isEmpty)
@@ -658,7 +652,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
658652
assert(po.actionType === PrivilegeObjectActionType.OTHER)
659653
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
660654
assert(po.catalog.isEmpty)
661-
val db = if (isSparkV33OrGreater) defaultDb else null
655+
val db = defaultDb
662656
assertEqualsIgnoreCase(db)(po.dbname)
663657
assertEqualsIgnoreCase("DropFunctionCommand")(po.objectName)
664658
assert(po.columns.isEmpty)
@@ -678,7 +672,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
678672
assert(po.actionType === PrivilegeObjectActionType.OTHER)
679673
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
680674
assert(po.catalog.isEmpty)
681-
val db = if (isSparkV33OrGreater) defaultDb else null
675+
val db = defaultDb
682676
assertEqualsIgnoreCase(db)(po.dbname)
683677
assertEqualsIgnoreCase("RefreshFunctionCommand")(po.objectName)
684678
assert(po.columns.isEmpty)
@@ -927,8 +921,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
927921
}
928922

929923
test("RepairTableCommand") {
930-
// only spark 3.2 or greater has RepairTableCommand
931-
assume(isSparkV32OrGreater)
932924
val tableName = reusedDb + "." + "TableToRepair"
933925
withTable(tableName) { _ =>
934926
sql(

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import org.apache.kyuubi.Utils
2929
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
3030
import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._
3131
import org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite._
32-
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
3332

3433
trait SparkSessionProvider {
3534
protected val catalogImpl: String
@@ -100,7 +99,7 @@ trait SparkSessionProvider {
10099
case (t, "table") => doAs(
101100
admin, {
102101
val purgeOption =
103-
if (isSparkV32OrGreater && isCatalogSupportPurge(
102+
if (isCatalogSupportPurge(
104103
spark.sessionState.catalogManager.currentCatalog.name())) {
105104
"PURGE"
106105
} else ""
@@ -109,9 +108,7 @@ trait SparkSessionProvider {
109108
case (db, "database") => doAs(admin, sql(s"DROP DATABASE IF EXISTS $db"))
110109
case (fn, "function") => doAs(admin, sql(s"DROP FUNCTION IF EXISTS $fn"))
111110
case (view, "view") => doAs(admin, sql(s"DROP VIEW IF EXISTS $view"))
112-
case (cacheTable, "cache") => if (isSparkV32OrGreater) {
113-
doAs(admin, sql(s"UNCACHE TABLE IF EXISTS $cacheTable"))
114-
}
111+
case (cacheTable, "cache") => doAs(admin, sql(s"UNCACHE TABLE IF EXISTS $cacheTable"))
115112
case (_, e) =>
116113
throw new RuntimeException(s"the resource whose resource type is $e cannot be cleared")
117114
}

0 commit comments

Comments
 (0)