From 939dd982c094ecf628df48355988f9a4cce887f9 Mon Sep 17 00:00:00 2001 From: Yaguang Jia Date: Mon, 1 Sep 2025 19:44:11 +0800 Subject: [PATCH 1/9] add RuleFunctionAuthorization to support hive udf Authorization --- .../authz/ranger/RangerSparkExtension.scala | 1 + .../ranger/RuleFunctionAuthorization.scala | 63 +++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala index 288719f07bf..2caeca11304 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala @@ -45,6 +45,7 @@ class RangerSparkExtension extends (SparkSessionExtensions => Unit) { override def apply(v1: SparkSessionExtensions): Unit = { v1.injectCheckRule(AuthzConfigurationChecker) + v1.injectCheckRule(RuleFunctionAuthorization) v1.injectResolutionRule(_ => RuleReplaceShowObjectCommands) v1.injectResolutionRule(_ => RuleApplyPermanentViewMarker) v1.injectResolutionRule(_ => RuleApplyTypeOfMarker) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala new file mode 100644 index 00000000000..8b531b3de25 --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.spark.authz.ranger + +import scala.collection.mutable + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.kyuubi.plugin.spark.authz._ +import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType +import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._ +import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ + +case class RuleFunctionAuthorization(spark: SparkSession) extends (LogicalPlan => Unit) { + override def apply(plan: LogicalPlan): Unit = { + val auditHandler = new SparkRangerAuditHandler + val ugi = getAuthzUgi(spark.sparkContext) + val (inputs, _, opType) = PrivilegesBuilder.buildFunctions(plan, spark) + + // Use a HashSet to deduplicate the same AccessResource and AccessType, the requests will be all + // the non-duplicate requests and in the same order as the input requests. + val requests = new mutable.ArrayBuffer[AccessRequest]() + val requestsSet = new mutable.HashSet[(AccessResource, AccessType)]() + + def addAccessRequest(objects: Iterable[PrivilegeObject], isInput: Boolean): Unit = { + objects.foreach { obj => + val resource = AccessResource(obj, opType) + val accessType = ranger.AccessType(obj, opType, isInput) + if (accessType != AccessType.NONE && !requestsSet.contains((resource, accessType))) { + requests += AccessRequest(resource, ugi, opType, accessType) + requestsSet.add(resource, accessType) + } + } + } + + addAccessRequest(inputs, isInput = true) + + val requestArrays = requests.map(Seq(_)) + if (authorizeInSingleCall) { + verify(requestArrays.flatten, auditHandler) + } else { + requestArrays.flatten.foreach { req => + verify(Seq(req), auditHandler) + } + } + } +} From 38c2577e96219b065d78e1c2da203b417b5f1cef Mon Sep 17 00:00:00 2001 From: Yaguang Jia Date: Tue, 2 Sep 2025 22:18:21 +0800 Subject: [PATCH 2/9] add built in and udf test --- .../FunctionPrivilegesBuilderSuite.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala index ad4b57faa93..1d2a5b2f347 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala @@ -193,4 +193,25 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite } } + test("Built-in and UDF Function Call Query") { + val plan = sql(s"SELECT kyuubi_fun_0('TESTSTRING'), " + + s"kyuubi_fun_0(value)," + + s"abs(key)," + + s"abs(-100)," + + s"lower(value)," + + s"lower('TESTSTRING') " + + s"FROM $reusedTable").queryExecution.analyzed + val (inputs, _, _) = PrivilegesBuilder.buildFunctions(plan, spark) + assert(inputs.size === 2) + inputs.foreach { po => + assert(po.actionType === PrivilegeObjectActionType.OTHER) + assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) + assert(po.dbname startsWith reusedDb.toLowerCase) + assert(po.objectName startsWith functionNamePrefix.toLowerCase) + val accessType = ranger.AccessType(po, QUERY, isInput = true) + assert(accessType === AccessType.SELECT) + } + } + + } From 5b20926d00947f3df6fd51d5060de5befd9c94e4 Mon Sep 17 00:00:00 2001 From: Yaguang Jia Date: Thu, 4 Sep 2025 21:15:24 +0800 Subject: [PATCH 3/9] add ut for RuleFunctionAuthorization --- extensions/spark/kyuubi-spark-authz/pom.xml | 6 +++ .../spark/authz/ranger/AccessResource.scala | 1 + .../ranger/RuleFunctionAuthorization.scala | 5 +++ .../FunctionPrivilegesBuilderSuite.scala | 43 +++++++++++++++++-- 4 files changed, 52 insertions(+), 3 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/pom.xml b/extensions/spark/kyuubi-spark-authz/pom.xml index 49c79d49647..1a2ea540d60 100644 --- a/extensions/spark/kyuubi-spark-authz/pom.xml +++ b/extensions/spark/kyuubi-spark-authz/pom.xml @@ -397,6 +397,12 @@ ${project.version} test + + + org.mockito + mockito-core + test + diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala index 7772c86b788..70943187229 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala @@ -32,6 +32,7 @@ class AccessResource private (val objectType: ObjectType, val catalog: Option[St extends RangerAccessResourceImpl { implicit def asString(obj: Object): String = if (obj != null) obj.asInstanceOf[String] else null def getDatabase: String = getValue("database") + def getUdf: String = getValue("udf") def getTable: String = getValue("table") def getColumn: String = getValue("column") def getColumns: Seq[String] = { diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala index 8b531b3de25..7656ed30952 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala @@ -50,7 +50,12 @@ case class RuleFunctionAuthorization(spark: SparkSession) extends (LogicalPlan = } addAccessRequest(inputs, isInput = true) + checkPrivileges(requests, auditHandler) + } + def checkPrivileges( + requests: mutable.ArrayBuffer[AccessRequest], + auditHandler: SparkRangerAuditHandler): Unit = { val requestArrays = requests.map(Seq(_)) if (authorizeInSingleCall) { verify(requestArrays.flatten, auditHandler) diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala index 1d2a5b2f347..579779d0a85 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala @@ -17,13 +17,19 @@ package org.apache.kyuubi.plugin.spark.authz +import scala.collection.mutable + import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} // scalastyle:off import org.scalatest.funsuite.AnyFunSuite import org.apache.kyuubi.plugin.spark.authz.OperationType.QUERY -import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType +import org.apache.kyuubi.plugin.spark.authz.ranger.{AccessRequest, AccessResource, AccessType, RuleFunctionAuthorization, SparkRangerAuditHandler} abstract class FunctionPrivilegesBuilderSuite extends AnyFunSuite with SparkSessionProvider with BeforeAndAfterAll with BeforeAndAfterEach { @@ -193,7 +199,7 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite } } - test("Built-in and UDF Function Call Query") { + test("Built in and UDF Function Call Query") { val plan = sql(s"SELECT kyuubi_fun_0('TESTSTRING'), " + s"kyuubi_fun_0(value)," + s"abs(key)," + @@ -213,5 +219,36 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite } } - + test("[KYUUBI #7186] Introduce RuleFunctionAuthorization") { + + val ruleFunc = Mockito.spy[RuleFunctionAuthorization](RuleFunctionAuthorization(spark)) + Mockito.doAnswer(new Answer[Unit] { + override def answer(invocation: InvocationOnMock): Unit = { + val requests = invocation.getArgument[mutable.ArrayBuffer[AccessRequest]](0) + requests.foreach { request => + // deny udf `reusedDb.kyuubi_fun_0` + var database: String = request.getResource.asInstanceOf[AccessResource].getDatabase + var udf: String = request.getResource.asInstanceOf[AccessResource].getUdf + if (database.equalsIgnoreCase(reusedDb) && udf.equalsIgnoreCase("kyuubi_fun_0")) { + throw new AccessControlException("Access denied") + } + } + } + }).when(ruleFunc).checkPrivileges( + any[mutable.ArrayBuffer[AccessRequest]](), + any[SparkRangerAuditHandler]()) + + val query1 = sql(s"SELECT " + + s"${reusedDb}.kyuubi_fun_0('KYUUBI_STRING')," + + s"${reusedDb}.kyuubi_fun_1('KYUUBI_STRING') ").queryExecution.analyzed + intercept[AccessControlException] { ruleFunc.apply(query1) } + + val query2 = sql(s"SELECT " + + s"${reusedDb}.kyuubi_fun_0('KYUUBI_STRING')").queryExecution.analyzed + intercept[AccessControlException] { ruleFunc.apply(query2) } + + val query3 = sql(s"SELECT " + + s"${reusedDb}.kyuubi_fun_1('KYUUBI_STRING')").queryExecution.analyzed + ruleFunc.apply(query3) + } } From 60df21866ccbf9590e985a4980d60c22dd71a9ec Mon Sep 17 00:00:00 2001 From: Yaguang Jia Date: Sun, 7 Sep 2025 00:30:04 +0800 Subject: [PATCH 4/9] fix buildFunctions for command --- .../apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala index 01266eb2c85..1ccc9ef700c 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala @@ -284,9 +284,7 @@ object PrivilegesBuilder { val spec = getTableCommandSpec(command) val functionPrivAndOpType = spec.queries(plan) .map(plan => buildFunctions(plan, spark)) - functionPrivAndOpType.map(_._1) - .reduce(_ ++ _) - .foreach(functionPriv => inputObjs += functionPriv) + inputObjs ++= functionPrivAndOpType.flatMap(_._1) case plan => plan transformAllExpressions { case hiveFunction: Expression if isKnownFunction(hiveFunction) => From 0096db0d7c7f83037c6adaca0a989a7a2cf877cc Mon Sep 17 00:00:00 2001 From: Yaguang Jia Date: Sat, 13 Sep 2025 01:01:51 +0800 Subject: [PATCH 5/9] test RuleFunctionAuthorization in RangerSparkExtensionSuite. --- .../ranger/RuleFunctionAuthorization.scala | 5 - .../authz/gen/PolicyJsonFileGenerator.scala | 17 +++ .../test/resources/sparkSql_hive_jenkins.json | 94 +++++++++++--- .../FunctionPrivilegesBuilderSuite.scala | 117 +++++++++++------- .../ranger/RangerSparkExtensionSuite.scala | 104 +++++++++++++--- 5 files changed, 257 insertions(+), 80 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala index 7656ed30952..8b531b3de25 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala @@ -50,12 +50,7 @@ case class RuleFunctionAuthorization(spark: SparkSession) extends (LogicalPlan = } addAccessRequest(inputs, isInput = true) - checkPrivileges(requests, auditHandler) - } - def checkPrivileges( - requests: mutable.ArrayBuffer[AccessRequest], - auditHandler: SparkRangerAuditHandler): Unit = { val requestArrays = requests.map(Seq(_)) if (authorizeInSingleCall) { verify(requestArrays.flatten, auditHandler) diff --git a/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala b/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala index edff7d8a079..6ec44aa8d43 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala @@ -110,6 +110,7 @@ class PolicyJsonFileGenerator extends AnyFunSuite { policyAccessForPermViewAccessOnly, policyAccessForTable2AccessOnly, policyAccessForPaimonNsTable1SelectOnly, + policyAccessForDefaultDbUDF, // row filter policyFilterForSrcTableKeyLessThan20, policyFilterForPermViewKeyLessThan20, @@ -371,4 +372,20 @@ class PolicyJsonFileGenerator extends AnyFunSuite { users = List(table1OnlyUserForNs), accesses = allowTypes(select), delegateAdmin = true))) + + private val policyAccessForDefaultDbUDF = KRangerPolicy( + name = "defaultdb_udf", + description = "Policy for default db udf", + resources = Map( + databaseRes(defaultDb), + "udf" -> KRangerPolicyResource(values = List("kyuubi_func*"))), + policyItems = List( + KRangerPolicyItem( + users = List(bob), + accesses = allowTypes(select, update, create, drop, alter, index, lock, all, read, write), + delegateAdmin = true), + KRangerPolicyItem( + users = List(kent), + accesses = allowTypes(select), + delegateAdmin = true))) } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json index 16bdd5087a2..c5bd2f28dec 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json +++ b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json @@ -510,6 +510,72 @@ "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", + "name" : "defaultdb_udf", + "policyType" : 0, + "policyPriority" : 0, + "description" : "Policy for default db udf", + "isAuditEnabled" : true, + "resources" : { + "database" : { + "values" : [ "default" ], + "isExcludes" : false, + "isRecursive" : false + }, + "udf" : { + "values" : [ "kyuubi_func*" ], + "isExcludes" : false, + "isRecursive" : false + } + }, + "policyItems" : [ { + "accesses" : [ { + "type" : "select", + "isAllowed" : true + }, { + "type" : "update", + "isAllowed" : true + }, { + "type" : "create", + "isAllowed" : true + }, { + "type" : "drop", + "isAllowed" : true + }, { + "type" : "alter", + "isAllowed" : true + }, { + "type" : "index", + "isAllowed" : true + }, { + "type" : "lock", + "isAllowed" : true + }, { + "type" : "all", + "isAllowed" : true + }, { + "type" : "read", + "isAllowed" : true + }, { + "type" : "write", + "isAllowed" : true + } ], + "users" : [ "bob" ], + "delegateAdmin" : true + }, { + "accesses" : [ { + "type" : "select", + "isAllowed" : true + } ], + "users" : [ "kent" ], + "delegateAdmin" : true + } ], + "isDenyAllElse" : false + }, { + "id" : 11, + "guid" : "6512bd43-d9ca-36e0-ac99-0b0a82652dca", + "isEnabled" : true, + "version" : 1, + "service" : "hive_jenkins", "name" : "src_key_less_than_20", "policyType" : 2, "policyPriority" : 0, @@ -539,8 +605,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 11, - "guid" : "6512bd43-d9ca-36e0-ac99-0b0a82652dca", + "id" : 12, + "guid" : "c20ad4d7-6fe9-3759-aa27-a0c99bff6710", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -573,8 +639,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 12, - "guid" : "c20ad4d7-6fe9-3759-aa27-a0c99bff6710", + "id" : 13, + "guid" : "c51ce410-c124-310e-8db5-e4b97fc2af39", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -612,8 +678,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 13, - "guid" : "c51ce410-c124-310e-8db5-e4b97fc2af39", + "id" : 14, + "guid" : "aab32389-22bc-325a-af60-6eb525ffdc56", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -651,8 +717,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 14, - "guid" : "aab32389-22bc-325a-af60-6eb525ffdc56", + "id" : 15, + "guid" : "9bf31c7f-f062-336a-96d3-c8bd1f8f2ff3", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -690,8 +756,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 15, - "guid" : "9bf31c7f-f062-336a-96d3-c8bd1f8f2ff3", + "id" : 16, + "guid" : "c74d97b0-1eae-357e-84aa-9d5bade97baf", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -729,8 +795,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 16, - "guid" : "c74d97b0-1eae-357e-84aa-9d5bade97baf", + "id" : 17, + "guid" : "70efdf2e-c9b0-3607-9795-c442636b55fb", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -768,8 +834,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 17, - "guid" : "70efdf2e-c9b0-3607-9795-c442636b55fb", + "id" : 18, + "guid" : "6f4922f4-5568-361a-8cdf-4ad2299f6d23", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala index 579779d0a85..18cc2bbed9e 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala @@ -17,19 +17,13 @@ package org.apache.kyuubi.plugin.spark.authz -import scala.collection.mutable - import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.mockito.ArgumentMatchers.any -import org.mockito.Mockito -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} // scalastyle:off import org.scalatest.funsuite.AnyFunSuite import org.apache.kyuubi.plugin.spark.authz.OperationType.QUERY -import org.apache.kyuubi.plugin.spark.authz.ranger.{AccessRequest, AccessResource, AccessType, RuleFunctionAuthorization, SparkRangerAuditHandler} +import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType abstract class FunctionPrivilegesBuilderSuite extends AnyFunSuite with SparkSessionProvider with BeforeAndAfterAll with BeforeAndAfterEach { @@ -200,13 +194,15 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite } test("Built in and UDF Function Call Query") { - val plan = sql(s"SELECT kyuubi_fun_0('TESTSTRING'), " + - s"kyuubi_fun_0(value)," + - s"abs(key)," + - s"abs(-100)," + - s"lower(value)," + - s"lower('TESTSTRING') " + - s"FROM $reusedTable").queryExecution.analyzed + val plan = sql( + s""" + |SELECT + | kyuubi_fun_0('TESTSTRING') AS col1, + | kyuubi_fun_0(value) AS col2, + | abs(key) AS col3, abs(-100) AS col4, + | lower(value) AS col5,lower('TESTSTRING') AS col6 + |FROM $reusedTable + |""".stripMargin).queryExecution.analyzed val (inputs, _, _) = PrivilegesBuilder.buildFunctions(plan, spark) assert(inputs.size === 2) inputs.foreach { po => @@ -219,36 +215,67 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite } } - test("[KYUUBI #7186] Introduce RuleFunctionAuthorization") { - - val ruleFunc = Mockito.spy[RuleFunctionAuthorization](RuleFunctionAuthorization(spark)) - Mockito.doAnswer(new Answer[Unit] { - override def answer(invocation: InvocationOnMock): Unit = { - val requests = invocation.getArgument[mutable.ArrayBuffer[AccessRequest]](0) - requests.foreach { request => - // deny udf `reusedDb.kyuubi_fun_0` - var database: String = request.getResource.asInstanceOf[AccessResource].getDatabase - var udf: String = request.getResource.asInstanceOf[AccessResource].getUdf - if (database.equalsIgnoreCase(reusedDb) && udf.equalsIgnoreCase("kyuubi_fun_0")) { - throw new AccessControlException("Access denied") - } - } - } - }).when(ruleFunc).checkPrivileges( - any[mutable.ArrayBuffer[AccessRequest]](), - any[SparkRangerAuditHandler]()) - - val query1 = sql(s"SELECT " + - s"${reusedDb}.kyuubi_fun_0('KYUUBI_STRING')," + - s"${reusedDb}.kyuubi_fun_1('KYUUBI_STRING') ").queryExecution.analyzed - intercept[AccessControlException] { ruleFunc.apply(query1) } - - val query2 = sql(s"SELECT " + - s"${reusedDb}.kyuubi_fun_0('KYUUBI_STRING')").queryExecution.analyzed - intercept[AccessControlException] { ruleFunc.apply(query2) } - - val query3 = sql(s"SELECT " + - s"${reusedDb}.kyuubi_fun_1('KYUUBI_STRING')").queryExecution.analyzed - ruleFunc.apply(query3) + test("Function Call in Crate Table/View") { + val plan1 = sql( + s""" + |CREATE TABLE table1 AS + |SELECT + | kyuubi_fun_0('KYUUBI_TESTSTRING'), + | kyuubi_fun_0(value) + |FROM $reusedTable + |""".stripMargin).queryExecution.analyzed + val (inputs1, _, _) = PrivilegesBuilder.buildFunctions(plan1, spark) + assert(inputs1.size === 2) + inputs1.foreach { po => + assert(po.actionType === PrivilegeObjectActionType.OTHER) + assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) + assert(po.dbname startsWith reusedDb.toLowerCase) + assert(po.objectName startsWith functionNamePrefix.toLowerCase) + val accessType = ranger.AccessType(po, QUERY, isInput = true) + assert(accessType === AccessType.SELECT) + } + val plan2 = sql("DROP TABLE IF EXISTS table1").queryExecution.analyzed + val (inputs2, _, _) = PrivilegesBuilder.buildFunctions(plan2, spark) + assert(inputs2.size === 0) + + val plan3 = sql( + s""" + |CREATE VIEW view1 AS SELECT + | kyuubi_fun_0('KYUUBI_TESTSTRING') AS fun1, + | kyuubi_fun_0(value) AS fun2 + |FROM $reusedTable + |""".stripMargin).queryExecution.analyzed + val (inputs3, _, _) = PrivilegesBuilder.buildFunctions(plan3, spark) + assert(inputs3.size === 2) + inputs3.foreach { po => + assert(po.actionType === PrivilegeObjectActionType.OTHER) + assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) + assert(po.dbname startsWith reusedDb.toLowerCase) + assert(po.objectName startsWith functionNamePrefix.toLowerCase) + val accessType = ranger.AccessType(po, QUERY, isInput = true) + assert(accessType === AccessType.SELECT) + } + val plan4 = sql("DROP VIEW IF EXISTS view1").queryExecution.analyzed + val (inputs4, _, _) = PrivilegesBuilder.buildFunctions(plan4, spark) + assert(inputs4.size === 0) + } + + test("Function Call in INSERT OVERWRITE") { + val plan = sql( + s""" + |INSERT OVERWRITE TABLE $reusedTable + |SELECT key, kyuubi_fun_0(value) + |FROM $reusedPartTable + |""".stripMargin).queryExecution.analyzed + val (inputsUpdate, _, _) = PrivilegesBuilder.buildFunctions(plan, spark) + assert(inputsUpdate.size === 1) + inputsUpdate.foreach { po => + assert(po.actionType === PrivilegeObjectActionType.OTHER) + assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) + assert(po.dbname startsWith reusedDb.toLowerCase) + assert(po.objectName startsWith functionNamePrefix.toLowerCase) + val accessType = ranger.AccessType(po, QUERY, isInput = true) + assert(accessType === AccessType.SELECT) + } } } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala index 1fdea0ed969..2a6f4a5071d 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala @@ -47,6 +47,7 @@ import org.apache.kyuubi.plugin.spark.authz.rule.Authorization.KYUUBI_AUTHZ_TAG import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ import org.apache.kyuubi.util.AssertionUtils._ import org.apache.kyuubi.util.reflect.ReflectUtils._ + abstract class RangerSparkExtensionSuite extends AnyFunSuite with SparkSessionProvider with BeforeAndAfterAll with MysqlContainerEnv { // scalastyle:on @@ -218,8 +219,16 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite val e = intercept[AccessControlException](sql(create)) assert(e.getMessage === errorMessage("create", "mydb")) withCleanTmpResources(Seq((testDb, "database"))) { - doAs(admin, assert(Try { sql(create) }.isSuccess)) - doAs(admin, assert(Try { sql(alter) }.isSuccess)) + doAs( + admin, + assert(Try { + sql(create) + }.isSuccess)) + doAs( + admin, + assert(Try { + sql(alter) + }.isSuccess)) val e1 = intercept[AccessControlException](sql(alter)) assert(e1.getMessage === errorMessage("alter", "mydb")) val e2 = intercept[AccessControlException](sql(drop)) @@ -241,14 +250,34 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite assert(e.getMessage === errorMessage("create")) withCleanTmpResources(Seq((s"$db.$table", "table"))) { - doAs(bob, assert(Try { sql(create0) }.isSuccess)) - doAs(bob, assert(Try { sql(alter0) }.isSuccess)) + doAs( + bob, + assert(Try { + sql(create0) + }.isSuccess)) + doAs( + bob, + assert(Try { + sql(alter0) + }.isSuccess)) val e1 = intercept[AccessControlException](sql(drop0)) assert(e1.getMessage === errorMessage("drop")) - doAs(bob, assert(Try { sql(alter0) }.isSuccess)) - doAs(bob, assert(Try { sql(select).collect() }.isSuccess)) - doAs(kent, assert(Try { sql(s"SELECT key FROM $db.$table").collect() }.isSuccess)) + doAs( + bob, + assert(Try { + sql(alter0) + }.isSuccess)) + doAs( + bob, + assert(Try { + sql(select).collect() + }.isSuccess)) + doAs( + kent, + assert(Try { + sql(s"SELECT key FROM $db.$table").collect() + }.isSuccess)) Seq( select, @@ -272,13 +301,50 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite test("auth: functions") { val db = defaultDb val func = "func" - val create0 = s"CREATE FUNCTION IF NOT EXISTS $db.$func AS 'abc.mnl.xyz'" - doAs( - kent, { - val e = intercept[AccessControlException](sql(create0)) - assert(e.getMessage === errorMessage("create", "default/func")) - }) - doAs(admin, assert(Try(sql(create0)).isSuccess)) + val kyuubiFunc = "kyuubi_func1" + withCleanTmpResources(Seq( + (func, "function"), + (kyuubiFunc, "function"))) { + val create0 = s"CREATE FUNCTION IF NOT EXISTS $db.$func AS 'abc.mnl.xyz'" + doAs( + bob, { + val e = intercept[AccessControlException](sql(create0)) + assert(e.getMessage === errorMessage("create", s"$db/$func")) + }) + doAs( + kent, { + val e = intercept[AccessControlException](sql(create0)) + assert(e.getMessage === errorMessage("create", s"$db/$func")) + }) + doAs(admin, assert(Try(sql(create0)).isSuccess)) + + // [KYUUBI #7186] Introduce RuleFunctionAuthorization + val createKyuubiFunc = + s""" + |CREATE FUNCTION IF NOT EXISTS + | $db.$kyuubiFunc + | AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash' + |""".stripMargin + doAs( + kent, { + val e = intercept[AccessControlException](sql(createKyuubiFunc)) + assert(e.getMessage === errorMessage("create", s"$db/$kyuubiFunc")) + }) + doAs(bob, assert(Try(sql(createKyuubiFunc)).isSuccess)) + doAs(admin, assert(Try(sql(createKyuubiFunc)).isSuccess)) + + val selectKyuubiFunc = + s""" + |SELECT $db.$kyuubiFunc("KYUUBUI_TEST_STRING")""".stripMargin + doAs( + alice, { + val e = intercept[AccessControlException](sql(selectKyuubiFunc)) + assert(e.getMessage === errorMessage("select", s"$db/$kyuubiFunc")) + }) + doAs(kent, assert(Try(sql(selectKyuubiFunc)).isSuccess)) + doAs(bob, assert(Try(sql(selectKyuubiFunc)).isSuccess)) + doAs(admin, assert(Try(sql(selectKyuubiFunc)).isSuccess)) + } } test("show tables") { @@ -628,12 +694,18 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { // query all columns of the permanent view // with access privileges to the permanent view but no privilege to the source table val sql1 = s"SELECT * FROM $db1.$permView" - doAs(userPermViewOnly, { sql(sql1).collect() }) + doAs( + userPermViewOnly, { + sql(sql1).collect() + }) // query the second column of permanent view with multiple columns // with access privileges to the permanent view but no privilege to the source table val sql2 = s"SELECT name FROM $db1.$permView" - doAs(userPermViewOnly, { sql(sql2).collect() }) + doAs( + userPermViewOnly, { + sql(sql2).collect() + }) } } From 05c7de0f6f1653322a78bc18c1d2cc0bf5925814 Mon Sep 17 00:00:00 2001 From: Yaguang Jia Date: Sat, 13 Sep 2025 03:43:39 +0800 Subject: [PATCH 6/9] revert mock --- extensions/spark/kyuubi-spark-authz/pom.xml | 6 ------ .../plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/pom.xml b/extensions/spark/kyuubi-spark-authz/pom.xml index 1a2ea540d60..49c79d49647 100644 --- a/extensions/spark/kyuubi-spark-authz/pom.xml +++ b/extensions/spark/kyuubi-spark-authz/pom.xml @@ -397,12 +397,6 @@ ${project.version} test - - - org.mockito - mockito-core - test - diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala index 18cc2bbed9e..5d8a824b4fc 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala @@ -215,7 +215,7 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite } } - test("Function Call in Crate Table/View") { + test("Function Call in Create Table/View") { val plan1 = sql( s""" |CREATE TABLE table1 AS From 114aafe816d1ac8cc88a08714bb6f6b363ed4ef7 Mon Sep 17 00:00:00 2001 From: Yaguang Jia Date: Sat, 13 Sep 2025 13:01:29 +0800 Subject: [PATCH 7/9] mv test to HiveCatalogRangerSparkExtensionSuite --- .../ranger/RangerSparkExtensionSuite.scala | 65 ++++++++++--------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala index 2a6f4a5071d..387afe41987 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala @@ -301,10 +301,8 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite test("auth: functions") { val db = defaultDb val func = "func" - val kyuubiFunc = "kyuubi_func1" withCleanTmpResources(Seq( - (func, "function"), - (kyuubiFunc, "function"))) { + (func, "function"))) { val create0 = s"CREATE FUNCTION IF NOT EXISTS $db.$func AS 'abc.mnl.xyz'" doAs( bob, { @@ -317,33 +315,6 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite assert(e.getMessage === errorMessage("create", s"$db/$func")) }) doAs(admin, assert(Try(sql(create0)).isSuccess)) - - // [KYUUBI #7186] Introduce RuleFunctionAuthorization - val createKyuubiFunc = - s""" - |CREATE FUNCTION IF NOT EXISTS - | $db.$kyuubiFunc - | AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash' - |""".stripMargin - doAs( - kent, { - val e = intercept[AccessControlException](sql(createKyuubiFunc)) - assert(e.getMessage === errorMessage("create", s"$db/$kyuubiFunc")) - }) - doAs(bob, assert(Try(sql(createKyuubiFunc)).isSuccess)) - doAs(admin, assert(Try(sql(createKyuubiFunc)).isSuccess)) - - val selectKyuubiFunc = - s""" - |SELECT $db.$kyuubiFunc("KYUUBUI_TEST_STRING")""".stripMargin - doAs( - alice, { - val e = intercept[AccessControlException](sql(selectKyuubiFunc)) - assert(e.getMessage === errorMessage("select", s"$db/$kyuubiFunc")) - }) - doAs(kent, assert(Try(sql(selectKyuubiFunc)).isSuccess)) - doAs(bob, assert(Try(sql(selectKyuubiFunc)).isSuccess)) - doAs(admin, assert(Try(sql(selectKyuubiFunc)).isSuccess)) } } @@ -1614,4 +1585,38 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } } } + + + test("[KYUUBI #7186] Introduce RuleFunctionAuthorization") { + val db = defaultDb + val kyuubiFunc = "kyuubi_func1" + withCleanTmpResources(Seq( + (kyuubiFunc, "function"))) { + val createKyuubiFunc = + s""" + |CREATE FUNCTION IF NOT EXISTS + | $db.$kyuubiFunc + | AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash' + |""".stripMargin + doAs( + kent, { + val e = intercept[AccessControlException](sql(createKyuubiFunc)) + assert(e.getMessage === errorMessage("create", s"$db/$kyuubiFunc")) + }) + doAs(bob, assert(Try(sql(createKyuubiFunc)).isSuccess)) + doAs(admin, assert(Try(sql(createKyuubiFunc)).isSuccess)) + + val selectKyuubiFunc = + s""" + |SELECT $db.$kyuubiFunc("KYUUBUI_TEST_STRING")""".stripMargin + doAs( + alice, { + val e = intercept[AccessControlException](sql(selectKyuubiFunc)) + assert(e.getMessage === errorMessage("select", s"$db/$kyuubiFunc")) + }) + doAs(kent, assert(Try(sql(selectKyuubiFunc)).isSuccess)) + doAs(bob, assert(Try(sql(selectKyuubiFunc)).isSuccess)) + doAs(admin, assert(Try(sql(selectKyuubiFunc)).isSuccess)) + } + } } From 85d19d2582f87d9407e6b6c3764782e4c6cb2bd9 Mon Sep 17 00:00:00 2001 From: Yaguang Jia Date: Sat, 13 Sep 2025 13:03:47 +0800 Subject: [PATCH 8/9] mv test to HiveCatalogRangerSparkExtensionSuite --- .../plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala index 387afe41987..1859450fe08 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala @@ -1586,7 +1586,6 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } } - test("[KYUUBI #7186] Introduce RuleFunctionAuthorization") { val db = defaultDb val kyuubiFunc = "kyuubi_func1" From 6afed7a21b3108e4659256f138d23186d7479fc1 Mon Sep 17 00:00:00 2001 From: Yaguang Jia Date: Sat, 13 Sep 2025 22:07:27 +0800 Subject: [PATCH 9/9] update RuleFunctionAuthorization --- .../spark/authz/ranger/RuleFunctionAuthorization.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala index 8b531b3de25..d4b16558d54 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala @@ -19,6 +19,7 @@ package org.apache.kyuubi.plugin.spark.authz.ranger import scala.collection.mutable +import org.apache.ranger.plugin.policyengine.RangerAccessRequest import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -51,11 +52,13 @@ case class RuleFunctionAuthorization(spark: SparkSession) extends (LogicalPlan = addAccessRequest(inputs, isInput = true) - val requestArrays = requests.map(Seq(_)) + val requestSeq: Seq[RangerAccessRequest] = + requests.map(_.asInstanceOf[RangerAccessRequest]).toSeq + if (authorizeInSingleCall) { - verify(requestArrays.flatten, auditHandler) + verify(requestSeq, auditHandler) } else { - requestArrays.flatten.foreach { req => + requestSeq.foreach { req => verify(Seq(req), auditHandler) } }