diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 4bc51d1a384..04d78d2e195 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -60,11 +60,6 @@ jobs: spark-archive: '-Pscala-2.13' exclude-tags: '' comment: 'normal' - - java: 8 - spark: '3.5' - spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6' - exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest' - comment: 'verify-on-spark-3.2-binary' - java: 8 spark: '3.5' spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.3.3 -Dspark.archive.name=spark-3.3.3-bin-hadoop3.tgz -Pzookeeper-3.6' diff --git a/docs/deployment/migration-guide.md b/docs/deployment/migration-guide.md index 03d0093f3d3..faa66d40b87 100644 --- a/docs/deployment/migration-guide.md +++ b/docs/deployment/migration-guide.md @@ -19,6 +19,7 @@ ## Upgrading from Kyuubi 1.10 to 1.11 +* Since Kyuubi 1.11, the support of Spark engine for Spark 3.2 is removed. * Since Kyuubi 1.11, the support of Flink engine for Flink 1.17 and 1.18 are deprecated, and will be removed in the future. * Since Kyuubi 1.11, the configuration `spark.sql.watchdog.forcedMaxOutputRows` provided by Kyuubi Spark extension is removed, consider using `kyuubi.operation.result.max.rows` instead. Note, the latter works without requirement of installing Kyuubi Spark extension. * Since Kyuubi 1.11, if the engine is running in cluster mode, Kyuubi will respect the `kyuubi.session.engine.startup.waitCompletion` config to determine whether to wait for the engine completion or not. If the engine is running in client mode, Kyuubi will always wait for the engine completion. And for Spark engine, Kyuubi will append the `spark.yarn.submit.waitAppCompletion` and `spark.kubernetes.submission.waitAppCompletion` configs to the engine conf based on the value of `kyuubi.session.engine.startup.waitCompletion`. diff --git a/docs/quick_start/quick_start.rst b/docs/quick_start/quick_start.rst index f638dd9cbae..8cb1162d00d 100644 --- a/docs/quick_start/quick_start.rst +++ b/docs/quick_start/quick_start.rst @@ -43,7 +43,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each component. **Kyuubi** Gateway \ |release| \ - Kyuubi Server Engine lib - Kyuubi Engine Beeline - Kyuubi Beeline - **Spark** Engine 3.2 to 3.5, 4.0 A Spark distribution + **Spark** Engine 3.3 to 3.5, 4.0 A Spark distribution **Flink** Engine 1.17 to 1.20 A Flink distribution **Trino** Engine N/A A Trino cluster allows to access via trino-client v411 **Doris** Engine N/A A Doris cluster diff --git a/extensions/spark/kyuubi-spark-authz/README.md b/extensions/spark/kyuubi-spark-authz/README.md index d866d0df4b3..c63da2fcf44 100644 --- a/extensions/spark/kyuubi-spark-authz/README.md +++ b/extensions/spark/kyuubi-spark-authz/README.md @@ -37,8 +37,8 @@ build/mvn clean package -DskipTests -pl :kyuubi-spark-authz_2.12 -am -Dspark.ver - [x] 3.5.x (default) - [x] 3.4.x - [x] 3.3.x -- [x] 3.2.x -- [x] 3.1.x +- [ ] 3.2.x +- [ ] 3.1.x - [ ] 3.0.x - [ ] 2.4.x and earlier diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json index 7ce5591525f..1e6e10380af 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json @@ -840,17 +840,6 @@ "isInput" : true, "comment" : "" } ] -}, { - "classname" : "org.apache.spark.sql.execution.command.AddFileCommand", - "tableDescs" : [ ], - "opType" : "ADD", - "queryDescs" : [ ], - "uriDescs" : [ { - "fieldName" : "path", - "fieldExtractor" : "StringURIExtractor", - "isInput" : true, - "comment" : "" - } ] }, { "classname" : "org.apache.spark.sql.execution.command.AddFilesCommand", "tableDescs" : [ ], @@ -969,22 +958,6 @@ "opType" : "ALTERTABLE_DROPPARTS", "queryDescs" : [ ], "uriDescs" : [ ] -}, { - "classname" : "org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand", - "tableDescs" : [ { - "fieldName" : "tableName", - "fieldExtractor" : "TableIdentifierTableExtractor", - "columnDesc" : null, - "actionTypeDesc" : null, - "tableTypeDesc" : null, - "catalogDesc" : null, - "isInput" : false, - "setCurrentDatabaseIfMissing" : false, - "comment" : "" - } ], - "opType" : "MSCK", - "queryDescs" : [ ], - "uriDescs" : [ ] }, { "classname" : "org.apache.spark.sql.execution.command.AlterTableRenameCommand", "tableDescs" : [ { diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala index 01266eb2c85..f186940c2b9 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala @@ -224,24 +224,7 @@ object PrivilegesBuilder { } } spec.queries(plan).foreach { p => - if (p.resolved) { - buildQuery(Project(p.output, p), inputObjs, spark = spark) - } else { - try { - // For spark 3.1, Some command such as CreateTableASSelect, its query was unresolved, - // Before this pr, we just ignore it, now we support this. - val analyzed = spark.sessionState.analyzer.execute(p) - buildQuery(Project(analyzed.output, analyzed), inputObjs, spark = spark) - } catch { - case e: Exception => - LOG.debug( - s""" - |Failed to analyze unresolved - |$p - |due to ${e.getMessage}""".stripMargin, - e) - } - } + buildQuery(Project(p.output, p), inputObjs, spark = spark) } spec.operationType diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/Authorization.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/Authorization.scala index d682b71d923..dcd534bb0b9 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/Authorization.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/Authorization.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.plugin.spark.authz.rule import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY @@ -52,12 +52,7 @@ object Authorization { def markAuthChecked(plan: LogicalPlan): LogicalPlan = { plan.setTagValue(KYUUBI_AUTHZ_TAG, ()) - plan transformDown { - // TODO: Add this line Support for spark3.1, we can remove this - // after spark 3.2 since https://issues.apache.org/jira/browse/SPARK-34269 - case view: View => - markAllNodesAuthChecked(view.child) - } + plan } protected def isAuthChecked(plan: LogicalPlan): Boolean = { diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/FilterDataSourceV2Strategy.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/FilterDataSourceV2Strategy.scala index e268ed6bc7c..18e2342e361 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/FilterDataSourceV2Strategy.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/FilterDataSourceV2Strategy.scala @@ -17,18 +17,11 @@ package org.apache.kyuubi.plugin.spark.authz.rule.rowfilter import org.apache.spark.sql.{SparkSession, Strategy} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan case class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // For Spark 3.1 and below, `ColumnPruning` rule will set `ObjectFilterPlaceHolder#child` to - // `Project` - case ObjectFilterPlaceHolder(Project(_, child)) if child.nodeName == "ShowNamespaces" => - spark.sessionState.planner.plan(child) - .map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq - - // For Spark 3.2 and above case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces" => spark.sessionState.planner.plan(child) .map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala index 1c2ad5f5d32..de567d439fa 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala @@ -84,8 +84,6 @@ private[authz] object AuthZUtils { } lazy val SPARK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION) - lazy val isSparkV32OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.2" - lazy val isSparkV33OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.3" lazy val isSparkV34OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.4" lazy val isSparkV35OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.5" lazy val isSparkV40OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "4.0" diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala index 63837f2501d..64faa2bc15e 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala @@ -244,41 +244,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assert(accessType === AccessType.ALTER) } - test("AlterTableRecoverPartitionsCommand") { - // AlterTableRecoverPartitionsCommand exists in the version below 3.2 - assume(!isSparkV32OrGreater) - val tableName = reusedDb + "." + "TableToMsck" - withTable(tableName) { _ => - sql( - s""" - |CREATE TABLE $tableName - |(key int, value string, pid string) - |USING parquet - |PARTITIONED BY (pid)""".stripMargin) - val sqlStr = - s""" - |MSCK REPAIR TABLE $tableName - |""".stripMargin - val plan = sql(sqlStr).queryExecution.analyzed - val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan, spark) - assert(operationType === MSCK) - assert(inputs.isEmpty) - - assert(outputs.size === 1) - outputs.foreach { po => - assert(po.actionType === PrivilegeObjectActionType.OTHER) - assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW) - assert(po.catalog.isEmpty) - assertEqualsIgnoreCase(reusedDb)(po.dbname) - assertEqualsIgnoreCase(tableName.split("\\.").last)(po.objectName) - assert(po.columns.isEmpty) - checkTableOwner(po) - val accessType = ranger.AccessType(po, operationType, isInput = false) - assert(accessType === AccessType.ALTER) - } - } - } - // ALTER TABLE default.StudentInfo PARTITION (age='10') RENAME TO PARTITION (age='15'); test("AlterTableRenamePartitionCommand") { sql(s"ALTER TABLE $reusedPartTable ADD IF NOT EXISTS PARTITION (pid=1)") @@ -367,11 +332,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW) assertEqualsIgnoreCase(reusedDb)(po0.dbname) assertEqualsIgnoreCase(reusedPartTableShort)(po0.objectName) - if (isSparkV32OrGreater) { - // Query in AlterViewAsCommand can not be resolved before SPARK-34698 - assert(po0.columns === Seq("key", "pid", "value")) - checkTableOwner(po0) - } + assert(po0.columns === Seq("key", "pid", "value")) + checkTableOwner(po0) val accessType0 = ranger.AccessType(po0, operationType, isInput = true) assert(accessType0 === AccessType.SELECT) @@ -482,7 +444,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite } test("AnalyzeTablesCommand") { - assume(isSparkV32OrGreater) val plan = sql(s"ANALYZE TABLES IN $reusedDb COMPUTE STATISTICS") .queryExecution.analyzed val (in, out, operationType) = PrivilegesBuilder.build(plan, spark) @@ -626,7 +587,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assert(po.actionType === PrivilegeObjectActionType.OTHER) assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) assert(po.catalog.isEmpty) - val db = if (isSparkV33OrGreater) defaultDb else null + val db = defaultDb assertEqualsIgnoreCase(db)(po.dbname) assertEqualsIgnoreCase("CreateFunctionCommand")(po.objectName) assert(po.columns.isEmpty) @@ -658,7 +619,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assert(po.actionType === PrivilegeObjectActionType.OTHER) assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) assert(po.catalog.isEmpty) - val db = if (isSparkV33OrGreater) defaultDb else null + val db = defaultDb assertEqualsIgnoreCase(db)(po.dbname) assertEqualsIgnoreCase("DropFunctionCommand")(po.objectName) assert(po.columns.isEmpty) @@ -678,7 +639,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assert(po.actionType === PrivilegeObjectActionType.OTHER) assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) assert(po.catalog.isEmpty) - val db = if (isSparkV33OrGreater) defaultDb else null + val db = defaultDb assertEqualsIgnoreCase(db)(po.dbname) assertEqualsIgnoreCase("RefreshFunctionCommand")(po.objectName) assert(po.columns.isEmpty) @@ -927,8 +888,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite } test("RepairTableCommand") { - // only spark 3.2 or greater has RepairTableCommand - assume(isSparkV32OrGreater) val tableName = reusedDb + "." + "TableToRepair" withTable(tableName) { _ => sql( diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala index 1fed6bc8b82..f3ecc90f121 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala @@ -29,7 +29,6 @@ import org.apache.kyuubi.Utils import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._ import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._ import org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite._ -import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ trait SparkSessionProvider { protected val catalogImpl: String @@ -100,7 +99,7 @@ trait SparkSessionProvider { case (t, "table") => doAs( admin, { val purgeOption = - if (isSparkV32OrGreater && isCatalogSupportPurge( + if (isCatalogSupportPurge( spark.sessionState.catalogManager.currentCatalog.name())) { "PURGE" } else "" @@ -109,9 +108,7 @@ trait SparkSessionProvider { case (db, "database") => doAs(admin, sql(s"DROP DATABASE IF EXISTS $db")) case (fn, "function") => doAs(admin, sql(s"DROP FUNCTION IF EXISTS $fn")) case (view, "view") => doAs(admin, sql(s"DROP VIEW IF EXISTS $view")) - case (cacheTable, "cache") => if (isSparkV32OrGreater) { - doAs(admin, sql(s"UNCACHE TABLE IF EXISTS $cacheTable")) - } + case (cacheTable, "cache") => doAs(admin, sql(s"UNCACHE TABLE IF EXISTS $cacheTable")) case (_, e) => throw new RuntimeException(s"the resource whose resource type is $e cannot be cleared") } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala index 1b6e07b77dd..40cfc38774e 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala @@ -228,7 +228,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("UpdateTable") { - assume(isSparkV32OrGreater) assume(supportsUpdateTable) val plan = executePlan(s"UPDATE $catalogTable SET value = 'a' WHERE key = 0").analyzed @@ -315,7 +314,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { test("AddPartitions") { assume(supportsPartitionManagement) - assume(isSparkV32OrGreater) val plan = executePlan(s"ALTER TABLE $catalogPartTable " + s"ADD PARTITION (dt='2022-01-01')").analyzed @@ -337,7 +335,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { test("DropPartitions") { assume(supportsPartitionManagement) - assume(isSparkV32OrGreater) val plan = executePlan(s"ALTER TABLE $catalogPartTable " + s"DROP PARTITION (dt='2022-01-01')").analyzed @@ -359,7 +356,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { test("RenamePartitions") { assume(supportsPartitionManagement) - assume(isSparkV32OrGreater) val plan = executePlan(s"ALTER TABLE $catalogPartTable " + s"PARTITION (dt='2022-01-01') RENAME TO PARTITION (dt='2022-01-02')").analyzed @@ -381,7 +377,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { test("TruncatePartition") { assume(supportsPartitionManagement) - assume(isSparkV32OrGreater) val plan = executePlan(s"ALTER TABLE $catalogPartTable " + s"PARTITION (dt='2022-01-01') RENAME TO PARTITION (dt='2022-01-02')").analyzed @@ -485,7 +480,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { test("RepairTable") { assume(supportsPartitionGrammar) - assume(isSparkV32OrGreater) val plan = executePlan(s"MSCK REPAIR TABLE $catalogPartTable").analyzed @@ -506,7 +500,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("TruncateTable") { - assume(isSparkV32OrGreater) val plan = executePlan(s"TRUNCATE TABLE $catalogTable").analyzed @@ -547,7 +540,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { // with V2AlterTableCommand test("AddColumns") { - assume(isSparkV32OrGreater) val table = "AddColumns" withV2Table(table) { tableId => @@ -572,8 +564,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("AlterColumn") { - assume(isSparkV32OrGreater) - val table = "AlterColumn" withV2Table(table) { tableId => sql(s"CREATE TABLE $tableId (i int)") @@ -597,8 +587,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("DropColumns") { - assume(isSparkV32OrGreater) - val table = "DropColumns" withV2Table(table) { tableId => sql(s"CREATE TABLE $tableId (i int, j int)") @@ -622,8 +610,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("ReplaceColumns") { - assume(isSparkV32OrGreater) - val table = "ReplaceColumns" withV2Table(table) { tableId => sql(s"CREATE TABLE $tableId (i int, j int)") @@ -647,8 +633,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("RenameColumn") { - assume(isSparkV32OrGreater) - val table = "RenameColumn" withV2Table(table) { tableId => sql(s"CREATE TABLE $tableId (i int, j int)") @@ -687,7 +671,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("SetNamespaceProperties") { - assume(isSparkV33OrGreater) val plan = sql("ALTER DATABASE default SET DBPROPERTIES (abc = '123')").queryExecution.analyzed val (in, out, operationType) = PrivilegesBuilder.build(plan, spark) assertResult(plan.getClass.getName)( @@ -705,7 +688,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("CreateNamespace") { - assume(isSparkV33OrGreater) withDatabase("CreateNamespace") { db => val plan = sql(s"CREATE DATABASE $db").queryExecution.analyzed val (in, out, operationType) = PrivilegesBuilder.build(plan, spark) @@ -727,7 +709,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("SetNamespaceLocation") { - assume(isSparkV33OrGreater) // hive does not support altering database location assume(catalogImpl !== "hive") val newLoc = spark.conf.get("spark.sql.warehouse.dir") + "/new_db_location" @@ -760,7 +741,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("DescribeNamespace") { - assume(isSparkV33OrGreater) val plan = sql(s"DESC DATABASE $reusedDb").queryExecution.analyzed val (in, out, operationType) = PrivilegesBuilder.build(plan, spark) assertResult(plan.getClass.getName)( @@ -781,7 +761,6 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { } test("DropNamespace") { - assume(isSparkV33OrGreater) withDatabase("DropNameSpace") { db => sql(s"CREATE DATABASE $db") val plan = sql(s"DROP DATABASE DropNameSpace").queryExecution.analyzed diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala index 3c08ad4ab6f..77d8057c025 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala @@ -132,13 +132,12 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { TableCommandSpec(cmd, Seq(oldTableD), ALTERTABLE_RENAME) } - // this is for spark 3.1 or below - val AlterTableRecoverPartitions = { - val cmd = "org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand" + val RepairTable = { + val cmd = "org.apache.spark.sql.execution.command.RepairTableCommand" TableCommandSpec(cmd, Seq(tableNameDesc), MSCK) } - val RepairTable = { + val RepairTableV2 = { val cmd = "org.apache.spark.sql.catalyst.plans.logical.RepairTable" TableCommandSpec(cmd, Seq(resolvedTableDesc), MSCK) } @@ -667,9 +666,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { TableCommandSpec(cmd, Nil, ADD, uriDescs = Seq(uriDesc)) } - // For spark-3.1 - val AddFileCommand = { - val cmd = "org.apache.spark.sql.execution.command.AddFileCommand" + val AddJarCommand = { + val cmd = "org.apache.spark.sql.execution.command.AddJarCommand" val uriDesc = UriDesc("path", classOf[StringURIExtractor], isInput = true) TableCommandSpec(cmd, Nil, ADD, uriDescs = Seq(uriDesc)) } @@ -678,8 +676,7 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { AddArchivesCommand, AddArchivesCommand.copy(classname = "org.apache.spark.sql.execution.command.AddFilesCommand"), AddArchivesCommand.copy(classname = "org.apache.spark.sql.execution.command.AddJarsCommand"), - AddFileCommand, - AddFileCommand.copy(classname = "org.apache.spark.sql.execution.command.AddJarCommand"), + AddJarCommand, AddPartitions, DropPartitions, RenamePartitions, @@ -696,9 +693,6 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { AlterTableChangeColumn, AlterTableDropPartition, AlterTableRename, - AlterTableRecoverPartitions, - AlterTableRecoverPartitions.copy(classname = - "org.apache.spark.sql.execution.command.RepairTableCommand"), AlterTableRenamePartition, AlterTableSerDeProperties, AlterTableSetLocation, @@ -749,6 +743,7 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { OverwriteByExpression, OverwritePartitionsDynamic, RepairTable, + RepairTableV2, RefreshTable, RefreshTableV2, RefreshTable3d0, diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala index dbf88d7d028..595f243a0f8 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala @@ -25,7 +25,7 @@ import org.apache.kyuubi.plugin.spark.authz.AccessControlException import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._ import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._ import org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite._ -import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.{isSparkV32OrGreater, isSparkV35OrGreater} +import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.isSparkV35OrGreater import org.apache.kyuubi.tags.DeltaTest import org.apache.kyuubi.util.AssertionUtils._ @@ -295,8 +295,6 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } test("optimize table") { - assume(isSparkV32OrGreater, "optimize table is available in Delta Lake 1.2.0 and above") - withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) { doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1")) doAs(admin, sql(createTableSql(namespace1, table1))) @@ -446,8 +444,6 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } test("optimize path-based table") { - assume(isSparkV32OrGreater, "optimize table is available in Delta Lake 1.2.0 and above") - withTempDir(path => { doAs(admin, sql(createPathBasedTableSql(path))) val optimizeTableSql1 = s"OPTIMIZE delta.`$path`" @@ -517,10 +513,6 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } test("alter path-based table drop column") { - assume( - isSparkV32OrGreater, - "alter table drop column is available in Delta Lake 1.2.0 and above") - withTempDir(path => { doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name")))) val dropColumnSql = s"ALTER TABLE delta.`$path` DROP COLUMN birthDate" @@ -532,10 +524,6 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } test("alter path-based table rename column") { - assume( - isSparkV32OrGreater, - "alter table rename column is available in Delta Lake 1.2.0 and above") - withTempDir(path => { doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name")))) val renameColumnSql = s"ALTER TABLE delta.`$path`" + @@ -548,11 +536,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } test("alter path-based table replace columns") { - withTempDir(path => { - assume( - isSparkV32OrGreater, - "alter table replace columns is not available in Delta Lake 1.0.1") - + withTempDir { path => doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name")))) val replaceColumnsSql = s"ALTER TABLE delta.`$path`" + s" REPLACE COLUMNS (id INT, name STRING, gender STRING)" @@ -567,7 +551,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { if (isSparkV35OrGreater) { doAs(admin, sql(replaceColumnsSql)) } - }) + } } } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala index 8852aec1db6..52ec46e5e33 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala @@ -22,7 +22,6 @@ import org.apache.kyuubi.Utils import org.apache.kyuubi.plugin.spark.authz.AccessControlException import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._ import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._ -import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ import org.apache.kyuubi.tags.HudiTest import org.apache.kyuubi.util.AssertionUtils.interceptEndsWith @@ -48,15 +47,13 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { val index1 = "table_hoodie_index1" override def beforeAll(): Unit = { - if (isSparkV32OrGreater) { - spark.conf.set( - s"spark.sql.catalog.$sparkCatalog", - "org.apache.spark.sql.hudi.catalog.HoodieCatalog") - spark.conf.set(s"spark.sql.catalog.$sparkCatalog.type", "hadoop") - spark.conf.set( - s"spark.sql.catalog.$sparkCatalog.warehouse", - Utils.createTempDir("hudi-hadoop").toString) - } + spark.conf.set( + s"spark.sql.catalog.$sparkCatalog", + "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + spark.conf.set(s"spark.sql.catalog.$sparkCatalog.type", "hadoop") + spark.conf.set( + s"spark.sql.catalog.$sparkCatalog.warehouse", + Utils.createTempDir("hudi-hadoop").toString) super.beforeAll() } @@ -549,9 +546,7 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } test("IndexBasedCommand") { - assume( - !isSparkV33OrGreater, - "Hudi index creation not supported on Spark 3.3 or greater currently") + assume(false, "Hudi index creation not supported on Spark 3.3 or greater currently") withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (namespace1, "database"))) { doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1")) doAs( diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala index 16a8beb22c4..b72e82c170c 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala @@ -207,8 +207,6 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite } test("KYUUBI #4047 MergeIntoIcebergTable with row filter") { - assume(isSparkV32OrGreater) - val outputTable2 = "outputTable2" withCleanTmpResources(Seq( (s"$catalogV2.default.src", "table"), diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala index efaf28df8e0..44249e58a44 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala @@ -420,46 +420,43 @@ class PaimonCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } test("Batch Time Travel") { - // Batch Time Travel requires Spark 3.3+ - if (isSparkV33OrGreater) { - withCleanTmpResources(Seq( - (s"$catalogV2.$namespace1.$table1", "table"))) { - val createTable = createTableSql(namespace1, table1) - doAs(admin, sql(createTable)) - val insertSql = - s""" - |INSERT INTO $catalogV2.$namespace1.$table1 VALUES - |(1, "a"), (2, "b"); - |""".stripMargin - doAs(admin, sql(insertSql)) + withCleanTmpResources(Seq( + (s"$catalogV2.$namespace1.$table1", "table"))) { + val createTable = createTableSql(namespace1, table1) + doAs(admin, sql(createTable)) + val insertSql = + s""" + |INSERT INTO $catalogV2.$namespace1.$table1 VALUES + |(1, "a"), (2, "b"); + |""".stripMargin + doAs(admin, sql(insertSql)) - val querySnapshotVersionSql = - s""" - |SELECT id from $catalogV2.$namespace1.$table1 VERSION AS OF 1 - |""".stripMargin - doAs(table1OnlyUserForNs, sql(querySnapshotVersionSql).collect()) - interceptEndsWith[AccessControlException] { - doAs(someone, sql(querySnapshotVersionSql).collect()) - }(s"does not have [select] privilege on [$namespace1/$table1/id]") - doAs(admin, sql(querySnapshotVersionSql).collect()) + val querySnapshotVersionSql = + s""" + |SELECT id from $catalogV2.$namespace1.$table1 VERSION AS OF 1 + |""".stripMargin + doAs(table1OnlyUserForNs, sql(querySnapshotVersionSql).collect()) + interceptEndsWith[AccessControlException] { + doAs(someone, sql(querySnapshotVersionSql).collect()) + }(s"does not have [select] privilege on [$namespace1/$table1/id]") + doAs(admin, sql(querySnapshotVersionSql).collect()) - val batchTimeTravelTimestamp = - doAs( - admin, - sql(s"SELECT commit_time FROM $catalogV2.$namespace1.`$table1$$snapshots`" + - s" ORDER BY commit_time ASC LIMIT 1").collect()(0).getTimestamp(0)) + val batchTimeTravelTimestamp = + doAs( + admin, + sql(s"SELECT commit_time FROM $catalogV2.$namespace1.`$table1$$snapshots`" + + s" ORDER BY commit_time ASC LIMIT 1").collect()(0).getTimestamp(0)) - val queryWithTimestamp = - s""" - |SELECT id FROM $catalogV2.$namespace1.$table1 - |TIMESTAMP AS OF '$batchTimeTravelTimestamp' - |""".stripMargin - doAs(table1OnlyUserForNs, sql(queryWithTimestamp).collect()) - interceptEndsWith[AccessControlException] { - doAs(someone, sql(queryWithTimestamp).collect()) - }(s"does not have [select] privilege on [$namespace1/$table1/id]") - doAs(admin, sql(queryWithTimestamp).collect()) - } + val queryWithTimestamp = + s""" + |SELECT id FROM $catalogV2.$namespace1.$table1 + |TIMESTAMP AS OF '$batchTimeTravelTimestamp' + |""".stripMargin + doAs(table1OnlyUserForNs, sql(queryWithTimestamp).collect()) + interceptEndsWith[AccessControlException] { + doAs(someone, sql(queryWithTimestamp).collect()) + }(s"does not have [select] privilege on [$namespace1/$table1/id]") + doAs(admin, sql(queryWithTimestamp).collect()) } } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala index 1fdea0ed969..342bdf0e037 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala @@ -680,37 +680,34 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } test("[KYUUBI #3411] skip checking cache table") { - if (isSparkV32OrGreater) { // cache table sql supported since 3.2.0 - - val db1 = defaultDb - val srcTable1 = "hive_src1" - val cacheTable1 = "cacheTable1" - val cacheTable2 = "cacheTable2" - val cacheTable3 = "cacheTable3" - val cacheTable4 = "cacheTable4" - - withCleanTmpResources(Seq( - (s"$db1.$srcTable1", "table"), - (s"$db1.$cacheTable1", "cache"), - (s"$db1.$cacheTable2", "cache"), - (s"$db1.$cacheTable3", "cache"), - (s"$db1.$cacheTable4", "cache"))) { + val db1 = defaultDb + val srcTable1 = "hive_src1" + val cacheTable1 = "cacheTable1" + val cacheTable2 = "cacheTable2" + val cacheTable3 = "cacheTable3" + val cacheTable4 = "cacheTable4" - doAs( - admin, - sql(s"CREATE TABLE IF NOT EXISTS $db1.$srcTable1" + - s" (id int, name string, city string)")) - - withSingleCallEnabled { - val e1 = intercept[AccessControlException]( - doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from $db1.$srcTable1"))) - assert( - e1.getMessage.contains(s"does not have [select] privilege on " + - s"[$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name]")) - } - doAs(admin, sql(s"CACHE TABLE $cacheTable3 SELECT 1 AS a, 2 AS b ")) - doAs(someone, sql(s"CACHE TABLE $cacheTable4 select 1 as a, 2 as b ")) + withCleanTmpResources(Seq( + (s"$db1.$srcTable1", "table"), + (s"$db1.$cacheTable1", "cache"), + (s"$db1.$cacheTable2", "cache"), + (s"$db1.$cacheTable3", "cache"), + (s"$db1.$cacheTable4", "cache"))) { + + doAs( + admin, + sql(s"CREATE TABLE IF NOT EXISTS $db1.$srcTable1" + + s" (id int, name string, city string)")) + + withSingleCallEnabled { + val e1 = intercept[AccessControlException]( + doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from $db1.$srcTable1"))) + assert( + e1.getMessage.contains(s"does not have [select] privilege on " + + s"[$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name]")) } + doAs(admin, sql(s"CACHE TABLE $cacheTable3 SELECT 1 AS a, 2 AS b ")) + doAs(someone, sql(s"CACHE TABLE $cacheTable4 select 1 as a, 2 as b ")) } } @@ -995,7 +992,6 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } test("[KYUUBI #5503][AUTHZ] Check plan auth checked should not set tag to all child nodes") { - assume(isSparkV32OrGreater, "Spark 3.1 not support lateral subquery.") val db1 = defaultDb val table1 = "table1" val table2 = "table2" @@ -1187,11 +1183,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { test("Add resource command") { withTempDir { path => withSingleCallEnabled { - val supportedCommand = if (isSparkV32OrGreater) { - Seq("JAR", "FILE", "ARCHIVE") - } else { - Seq("JAR", "FILE") - } + val supportedCommand = Seq("JAR", "FILE", "ARCHIVE") supportedCommand.foreach { cmd => interceptEndsWith[AccessControlException]( doAs(someone, sql(s"ADD $cmd $path")))( diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala index 49d51e5d244..f7e886c30c7 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala @@ -238,8 +238,6 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu } test("[KYUUBI #3424] TRUNCATE TABLE") { - assume(isSparkV32OrGreater) - val e1 = intercept[AccessControlException]( doAs( someone, @@ -249,8 +247,6 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu } test("[KYUUBI #3424] MSCK REPAIR TABLE") { - assume(isSparkV32OrGreater) - val e1 = intercept[AccessControlException]( doAs( someone, diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala index 0eed970a4cd..844ce1b9ee9 100644 --- a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.kyuubi.KyuubiFunSuite import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession -import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION class TPCDSCatalogSuite extends KyuubiFunSuite { @@ -91,10 +90,7 @@ class TPCDSCatalogSuite extends KyuubiFunSuite { def assertStats(tableName: String, sizeInBytes: BigInt, rowCount: BigInt): Unit = { val stats = spark.table(tableName).queryExecution.analyzed.stats assert(stats.sizeInBytes == sizeInBytes) - // stats.rowCount only has value after SPARK-33954 - if (SPARK_RUNTIME_VERSION >= "3.2") { - assert(stats.rowCount.contains(rowCount), tableName) - } + assert(stats.rowCount.contains(rowCount), tableName) } assertStats("tpcds.sf1.call_center", 1830, 6) diff --git a/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala index 14415141e63..117bc9dd13b 100644 --- a/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.kyuubi.KyuubiFunSuite import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession -import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION class TPCHCatalogSuite extends KyuubiFunSuite { @@ -129,10 +128,7 @@ class TPCHCatalogSuite extends KyuubiFunSuite { def assertStats(tableName: String, sizeInBytes: BigInt, rowCount: BigInt): Unit = { val stats = spark.table(tableName).queryExecution.analyzed.stats assert(stats.sizeInBytes == sizeInBytes) - // stats.rowCount only has value after SPARK-33954 - if (SPARK_RUNTIME_VERSION >= "3.2") { - assert(stats.rowCount.contains(rowCount), tableName) - } + assert(stats.rowCount.contains(rowCount), tableName) } assertStats("tpch.sf1.customer", 26850000, 150000) assertStats("tpch.sf1.orders", 156000000, 1500000) diff --git a/extensions/spark/kyuubi-spark-lineage/README.md b/extensions/spark/kyuubi-spark-lineage/README.md index 6b3eeb902bb..75c995d5732 100644 --- a/extensions/spark/kyuubi-spark-lineage/README.md +++ b/extensions/spark/kyuubi-spark-lineage/README.md @@ -26,7 +26,7 @@ ## Build ```shell -build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am -Dspark.version=3.5.1 +build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am -Dspark.version=3.5.6 ``` ### Supported Apache Spark Versions diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala index 27d74aa173f..dc6673d4cfc 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -265,20 +265,10 @@ trait LineageParser { case p if p.nodeName == "CreateTableAsSelect" || p.nodeName == "ReplaceTableAsSelect" => - val (table, namespace, catalog) = - if (SPARK_RUNTIME_VERSION <= "3.2") { - ( - getField[Identifier](plan, "tableName").name, - getField[Identifier](plan, "tableName").namespace.mkString("."), - getField[TableCatalog](plan, "catalog").name()) - } else { - ( - invokeAs[Identifier](plan, "tableName").name(), - invokeAs[Identifier](plan, "tableName").namespace().mkString("."), - getField[CatalogPlugin]( - invokeAs[LogicalPlan](plan, "name"), - "catalog").name()) - } + val (table, namespace, catalog) = ( + invokeAs[Identifier](plan, "tableName").name(), + invokeAs[Identifier](plan, "tableName").namespace().mkString("."), + getField[CatalogPlugin](invokeAs[LogicalPlan](plan, "name"), "catalog").name()) extractColumnsLineage( getQuery(plan), parentColumnsLineage, diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala index 8e8d18f216e..03ee36e41f6 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala @@ -33,12 +33,9 @@ import org.scalatest.time.SpanSugar._ import org.apache.kyuubi.KyuubiFunSuite import org.apache.kyuubi.plugin.lineage.Lineage import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{buildColumnQualifiedName, buildTableQualifiedName, COLUMN_LINEAGE_TYPE, PROCESS_TYPE} -import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION class AtlasLineageDispatcherSuite extends KyuubiFunSuite with SparkListenerExtensionTest { - val catalogName = - if (SPARK_RUNTIME_VERSION <= "3.1") "org.apache.spark.sql.connector.InMemoryTableCatalog" - else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog" + val catalogName = "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog" override protected val catalogImpl: String = "hive" diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala index 378eb3bb460..95c3caa63fd 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala @@ -28,13 +28,10 @@ import org.apache.kyuubi.KyuubiFunSuite import org.apache.kyuubi.events.EventBus import org.apache.kyuubi.plugin.lineage.Lineage import org.apache.kyuubi.plugin.lineage.dispatcher.{OperationLineageKyuubiEvent, OperationLineageSparkEvent} -import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtensionTest { - val catalogName = - if (SPARK_RUNTIME_VERSION <= "3.1") "org.apache.spark.sql.connector.InMemoryTableCatalog" - else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog" + val catalogName = "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog" override protected val catalogImpl: String = "hive" diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala index 8af5b0f179b..966fb708882 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala @@ -22,9 +22,8 @@ import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME class RowLevelCatalogLineageParserSuite extends SparkSQLLineageParserHelperSuite { - override def catalogName: String = { + override def catalogName: String = "org.apache.spark.sql.connector.catalog.InMemoryRowLevelOperationTableCatalog" - } test("columns lineage extract - WriteDelta") { assume( diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala index 380b3eee4f7..90da4650b1f 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala @@ -29,15 +29,11 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.kyuubi.KyuubiFunSuite import org.apache.kyuubi.plugin.lineage.Lineage -import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION abstract class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite with SparkListenerExtensionTest { - def catalogName: String = { - if (SPARK_RUNTIME_VERSION <= "3.3") "org.apache.spark.sql.connector.InMemoryTableCatalog" - else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog" - } + def catalogName: String = "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog" val DEFAULT_CATALOG = LineageConf.DEFAULT_CATALOG override protected val catalogImpl: String = "hive" diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala index c9724c3fece..7a58af2077f 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala @@ -21,10 +21,6 @@ import org.apache.kyuubi.plugin.lineage.Lineage class TableCatalogLineageParserSuite extends SparkSQLLineageParserHelperSuite { - override def catalogName: String = { - "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog" - } - test("columns lineage extract - MergeIntoTable") { val ddls = """ diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 02d2a7afb59..9151fe0a264 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -383,9 +383,6 @@ object SparkSQLEngine extends Logging { } def main(args: Array[String]): Unit = { - if (KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION === "3.2") { - warn("The support for Spark 3.2 is deprecated, and will be removed in the next version.") - } val startedTime = System.currentTimeMillis() val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match { case Some(t) => t.toLong diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala index e2fb55134cc..e13653b01cb 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala @@ -302,7 +302,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { MessageSerializer.serialize(writeChannel, batch) // Always write the Ipc options at the end. - ArrowStreamWriter.writeEndOfStream(writeChannel, ARROW_IPC_OPTION_DEFAULT) + ArrowStreamWriter.writeEndOfStream(writeChannel, IpcOption.DEFAULT) batch.close() } { @@ -343,7 +343,4 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { errorOnDuplicatedFieldNames, largeVarTypes) } - - // IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark 3.2 - final private val ARROW_IPC_OPTION_DEFAULT = new IpcOption() } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala index a5c911ff37d..75ad51885a4 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala @@ -91,12 +91,8 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with .add("c15", "struct", nullable = true, "15") .add("c16", "binary", nullable = false, "16") .add("c17", "struct", nullable = true, "17") - - // since spark3.3.0 - if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") { - schema = schema.add("c18", "interval day", nullable = true, "18") - .add("c19", "interval year", nullable = true, "19") - } + .add("c18", "interval day", nullable = true, "18") + .add("c19", "interval year", nullable = true, "19") // since spark3.4.0 if (SPARK_ENGINE_RUNTIME_VERSION >= "3.4") { schema = schema.add("c20", "timestamp_ntz", nullable = true, "20") diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala index ddb612ad4d7..c52436d982e 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala @@ -25,7 +25,6 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.spark.{WithDiscoverySparkSQLEngine, WithEmbeddedZookeeper} -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION import org.apache.kyuubi.service.ServiceState abstract class SparkSQLEngineDeregisterSuite @@ -61,15 +60,9 @@ abstract class SparkSQLEngineDeregisterSuite class SparkSQLEngineDeregisterExceptionSuite extends SparkSQLEngineDeregisterSuite { override def withKyuubiConf: Map[String, String] = { - super.withKyuubiConf ++ Map(ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> { - if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") { - // see https://issues.apache.org/jira/browse/SPARK-35958 - "org.apache.spark.SparkArithmeticException" - } else { - classOf[ArithmeticException].getCanonicalName - } - }) - + super.withKyuubiConf ++ + Map(ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> + classOf[SparkArithmeticException].getCanonicalName) } } @@ -95,14 +88,8 @@ class SparkSQLEngineDeregisterExceptionTTLSuite super.withKyuubiConf ++ zookeeperConf ++ Map( ANSI_ENABLED.key -> "true", - ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> { - if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") { - // see https://issues.apache.org/jira/browse/SPARK-35958 - "org.apache.spark.SparkArithmeticException" - } else { - classOf[ArithmeticException].getCanonicalName - } - }, + ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> + classOf[SparkArithmeticException].getCanonicalName, ENGINE_DEREGISTER_JOB_MAX_FAILURES.key -> maxJobFailures.toString, ENGINE_DEREGISTER_EXCEPTION_TTL.key -> deregisterExceptionTTL.toString) } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala index 814c08343d0..787b229882b 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.operation import scala.collection.mutable.ListBuffer -import org.apache.kyuubi.{IcebergSuiteMixin, SPARK_COMPILE_VERSION} +import org.apache.kyuubi.IcebergSuiteMixin import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.util.AssertionUtils._ import org.apache.kyuubi.util.SparkVersionUtil @@ -156,12 +156,9 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit "map", "date", "timestamp", - // SPARK-37931 - if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") "struct" - else "struct<`X`: bigint, `Y`: double>", + "struct", "binary", - // SPARK-37931 - if (SPARK_COMPILE_VERSION >= "3.3") "struct" else "struct<`X`: string>") + "struct") val cols = dataTypes.zipWithIndex.map { case (dt, idx) => s"c$idx" -> dt } val (colNames, _) = cols.unzip diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala index 49f6b85d89f..963b8e8953a 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala @@ -27,9 +27,6 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with SparkVersionUtil { def resultFormat: String = "thrift" test("execute statement - select null") { - assume( - resultFormat == "thrift" || - (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2")) withJdbcStatement() { statement => val resultSet = statement.executeQuery("SELECT NULL AS col") assert(resultSet.next()) @@ -252,9 +249,6 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with SparkVersionUtil { } test("execute statement - select daytime interval") { - assume( - resultFormat == "thrift" || - (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.3")) withJdbcStatement() { statement => Map( "interval 1 day 1 hour -60 minutes 30 seconds" -> @@ -283,25 +277,15 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with SparkVersionUtil { assert(resultSet.next()) val result = resultSet.getString("col") val metaData = resultSet.getMetaData - if (SPARK_ENGINE_RUNTIME_VERSION <= "3.1") { - // for spark 3.1 and backwards - assert(result === kv._2._2) - assert(metaData.getPrecision(1) === Int.MaxValue) - assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.VARCHAR) - } else { - assert(result === kv._2._1) - assert(metaData.getPrecision(1) === 29) - assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.OTHER) - } + assert(result === kv._2._1) + assert(metaData.getPrecision(1) === 29) + assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.OTHER) assert(metaData.getScale(1) === 0) } } } test("execute statement - select year/month interval") { - assume( - resultFormat == "thrift" || - (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.3")) withJdbcStatement() { statement => Map( "INTERVAL 2022 YEAR" -> Tuple2("2022-0", "2022 years"), @@ -314,25 +298,15 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with SparkVersionUtil { assert(resultSet.next()) val result = resultSet.getString("col") val metaData = resultSet.getMetaData - if (SPARK_ENGINE_RUNTIME_VERSION <= "3.1") { - // for spark 3.1 and backwards - assert(result === kv._2._2) - assert(metaData.getPrecision(1) === Int.MaxValue) - assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.VARCHAR) - } else { - assert(result === kv._2._1) - assert(metaData.getPrecision(1) === 11) - assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.OTHER) - } + assert(result === kv._2._1) + assert(metaData.getPrecision(1) === 11) + assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.OTHER) assert(metaData.getScale(1) === 0) } } } test("execute statement - select array") { - assume( - resultFormat == "thrift" || - (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2")) withJdbcStatement() { statement => val resultSet = statement.executeQuery( "SELECT array() AS col1, array(1) AS col2, array(null) AS col3") @@ -350,9 +324,6 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with SparkVersionUtil { } test("execute statement - select map") { - assume( - resultFormat == "thrift" || - (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2")) withJdbcStatement() { statement => val resultSet = statement.executeQuery( "SELECT map() AS col1, map(1, 2, 3, 4) AS col2, map(1, null) AS col3") @@ -370,9 +341,6 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with SparkVersionUtil { } test("execute statement - select struct") { - assume( - resultFormat == "thrift" || - (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2")) withJdbcStatement() { statement => val resultSet = statement.executeQuery( "SELECT struct('1', '2') AS col1," +