Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ jobs:
spark-archive: '-Pscala-2.13'
exclude-tags: ''
comment: 'normal'
- java: 8
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.2-binary'
- java: 8
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.3.3 -Dspark.archive.name=spark-3.3.3-bin-hadoop3.tgz -Pzookeeper-3.6'
Expand Down
1 change: 1 addition & 0 deletions docs/deployment/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

## Upgrading from Kyuubi 1.10 to 1.11

* Since Kyuubi 1.11, the support of Spark engine for Spark 3.2 is removed.
* Since Kyuubi 1.11, the support of Flink engine for Flink 1.17 and 1.18 are deprecated, and will be removed in the future.
* Since Kyuubi 1.11, the configuration `spark.sql.watchdog.forcedMaxOutputRows` provided by Kyuubi Spark extension is removed, consider using `kyuubi.operation.result.max.rows` instead. Note, the latter works without requirement of installing Kyuubi Spark extension.
* Since Kyuubi 1.11, if the engine is running in cluster mode, Kyuubi will respect the `kyuubi.session.engine.startup.waitCompletion` config to determine whether to wait for the engine completion or not. If the engine is running in client mode, Kyuubi will always wait for the engine completion. And for Spark engine, Kyuubi will append the `spark.yarn.submit.waitAppCompletion` and `spark.kubernetes.submission.waitAppCompletion` configs to the engine conf based on the value of `kyuubi.session.engine.startup.waitCompletion`.
Expand Down
2 changes: 1 addition & 1 deletion docs/quick_start/quick_start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each component.
**Kyuubi** Gateway \ |release| \ - Kyuubi Server
Engine lib - Kyuubi Engine
Beeline - Kyuubi Beeline
**Spark** Engine 3.2 to 3.5, 4.0 A Spark distribution
**Spark** Engine 3.3 to 3.5, 4.0 A Spark distribution
**Flink** Engine 1.17 to 1.20 A Flink distribution
**Trino** Engine N/A A Trino cluster allows to access via trino-client v411
**Doris** Engine N/A A Doris cluster
Expand Down
4 changes: 2 additions & 2 deletions extensions/spark/kyuubi-spark-authz/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ build/mvn clean package -DskipTests -pl :kyuubi-spark-authz_2.12 -am -Dspark.ver
- [x] 3.5.x (default)
- [x] 3.4.x
- [x] 3.3.x
- [x] 3.2.x
- [x] 3.1.x
- [ ] 3.2.x
- [ ] 3.1.x
- [ ] 3.0.x
- [ ] 2.4.x and earlier

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,17 +840,6 @@
"isInput" : true,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.execution.command.AddFileCommand",
"tableDescs" : [ ],
"opType" : "ADD",
"queryDescs" : [ ],
"uriDescs" : [ {
"fieldName" : "path",
"fieldExtractor" : "StringURIExtractor",
"isInput" : true,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.execution.command.AddFilesCommand",
"tableDescs" : [ ],
Expand Down Expand Up @@ -969,22 +958,6 @@
"opType" : "ALTERTABLE_DROPPARTS",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand",
"tableDescs" : [ {
"fieldName" : "tableName",
"fieldExtractor" : "TableIdentifierTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false,
"comment" : ""
} ],
"opType" : "MSCK",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.execution.command.AlterTableRenameCommand",
"tableDescs" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,24 +224,7 @@ object PrivilegesBuilder {
}
}
spec.queries(plan).foreach { p =>
if (p.resolved) {
buildQuery(Project(p.output, p), inputObjs, spark = spark)
} else {
try {
// For spark 3.1, Some command such as CreateTableASSelect, its query was unresolved,
// Before this pr, we just ignore it, now we support this.
val analyzed = spark.sessionState.analyzer.execute(p)
buildQuery(Project(analyzed.output, analyzed), inputObjs, spark = spark)
} catch {
case e: Exception =>
LOG.debug(
s"""
|Failed to analyze unresolved
|$p
|due to ${e.getMessage}""".stripMargin,
e)
}
}
buildQuery(Project(p.output, p), inputObjs, spark = spark)
}
spec.operationType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.plugin.spark.authz.rule

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
Expand Down Expand Up @@ -52,12 +52,7 @@ object Authorization {

def markAuthChecked(plan: LogicalPlan): LogicalPlan = {
plan.setTagValue(KYUUBI_AUTHZ_TAG, ())
plan transformDown {
// TODO: Add this line Support for spark3.1, we can remove this
// after spark 3.2 since https://issues.apache.org/jira/browse/SPARK-34269
case view: View =>
markAllNodesAuthChecked(view.child)
}
plan
}

protected def isAuthChecked(plan: LogicalPlan): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@
package org.apache.kyuubi.plugin.spark.authz.rule.rowfilter

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan

case class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// For Spark 3.1 and below, `ColumnPruning` rule will set `ObjectFilterPlaceHolder#child` to
// `Project`
case ObjectFilterPlaceHolder(Project(_, child)) if child.nodeName == "ShowNamespaces" =>
spark.sessionState.planner.plan(child)
.map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq

// For Spark 3.2 and above
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces" =>
spark.sessionState.planner.plan(child)
.map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ private[authz] object AuthZUtils {
}

lazy val SPARK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION)
lazy val isSparkV32OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.2"
lazy val isSparkV33OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.3"
lazy val isSparkV34OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.4"
lazy val isSparkV35OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.5"
lazy val isSparkV40OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "4.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,41 +244,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(accessType === AccessType.ALTER)
}

test("AlterTableRecoverPartitionsCommand") {
// AlterTableRecoverPartitionsCommand exists in the version below 3.2
assume(!isSparkV32OrGreater)
val tableName = reusedDb + "." + "TableToMsck"
withTable(tableName) { _ =>
sql(
s"""
|CREATE TABLE $tableName
|(key int, value string, pid string)
|USING parquet
|PARTITIONED BY (pid)""".stripMargin)
val sqlStr =
s"""
|MSCK REPAIR TABLE $tableName
|""".stripMargin
val plan = sql(sqlStr).queryExecution.analyzed
val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === MSCK)
assert(inputs.isEmpty)

assert(outputs.size === 1)
outputs.foreach { po =>
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assert(po.catalog.isEmpty)
assertEqualsIgnoreCase(reusedDb)(po.dbname)
assertEqualsIgnoreCase(tableName.split("\\.").last)(po.objectName)
assert(po.columns.isEmpty)
checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = false)
assert(accessType === AccessType.ALTER)
}
}
}

// ALTER TABLE default.StudentInfo PARTITION (age='10') RENAME TO PARTITION (age='15');
test("AlterTableRenamePartitionCommand") {
sql(s"ALTER TABLE $reusedPartTable ADD IF NOT EXISTS PARTITION (pid=1)")
Expand Down Expand Up @@ -367,11 +332,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
assertEqualsIgnoreCase(reusedPartTableShort)(po0.objectName)
if (isSparkV32OrGreater) {
// Query in AlterViewAsCommand can not be resolved before SPARK-34698
assert(po0.columns === Seq("key", "pid", "value"))
checkTableOwner(po0)
}
assert(po0.columns === Seq("key", "pid", "value"))
checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)

Expand Down Expand Up @@ -482,7 +444,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
}

test("AnalyzeTablesCommand") {
assume(isSparkV32OrGreater)
val plan = sql(s"ANALYZE TABLES IN $reusedDb COMPUTE STATISTICS")
.queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
Expand Down Expand Up @@ -626,7 +587,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.catalog.isEmpty)
val db = if (isSparkV33OrGreater) defaultDb else null
val db = defaultDb
assertEqualsIgnoreCase(db)(po.dbname)
assertEqualsIgnoreCase("CreateFunctionCommand")(po.objectName)
assert(po.columns.isEmpty)
Expand Down Expand Up @@ -658,7 +619,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.catalog.isEmpty)
val db = if (isSparkV33OrGreater) defaultDb else null
val db = defaultDb
assertEqualsIgnoreCase(db)(po.dbname)
assertEqualsIgnoreCase("DropFunctionCommand")(po.objectName)
assert(po.columns.isEmpty)
Expand All @@ -678,7 +639,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.catalog.isEmpty)
val db = if (isSparkV33OrGreater) defaultDb else null
val db = defaultDb
assertEqualsIgnoreCase(db)(po.dbname)
assertEqualsIgnoreCase("RefreshFunctionCommand")(po.objectName)
assert(po.columns.isEmpty)
Expand Down Expand Up @@ -927,8 +888,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
}

test("RepairTableCommand") {
// only spark 3.2 or greater has RepairTableCommand
assume(isSparkV32OrGreater)
val tableName = reusedDb + "." + "TableToRepair"
withTable(tableName) { _ =>
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._
import org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._

trait SparkSessionProvider {
protected val catalogImpl: String
Expand Down Expand Up @@ -100,7 +99,7 @@ trait SparkSessionProvider {
case (t, "table") => doAs(
admin, {
val purgeOption =
if (isSparkV32OrGreater && isCatalogSupportPurge(
if (isCatalogSupportPurge(
spark.sessionState.catalogManager.currentCatalog.name())) {
"PURGE"
} else ""
Expand All @@ -109,9 +108,7 @@ trait SparkSessionProvider {
case (db, "database") => doAs(admin, sql(s"DROP DATABASE IF EXISTS $db"))
case (fn, "function") => doAs(admin, sql(s"DROP FUNCTION IF EXISTS $fn"))
case (view, "view") => doAs(admin, sql(s"DROP VIEW IF EXISTS $view"))
case (cacheTable, "cache") => if (isSparkV32OrGreater) {
doAs(admin, sql(s"UNCACHE TABLE IF EXISTS $cacheTable"))
}
case (cacheTable, "cache") => doAs(admin, sql(s"UNCACHE TABLE IF EXISTS $cacheTable"))
case (_, e) =>
throw new RuntimeException(s"the resource whose resource type is $e cannot be cleared")
}
Expand Down
Loading
Loading