Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,7 @@ object PrivilegesBuilder {
val spec = getTableCommandSpec(command)
val functionPrivAndOpType = spec.queries(plan)
.map(plan => buildFunctions(plan, spark))
functionPrivAndOpType.map(_._1)
.reduce(_ ++ _)
.foreach(functionPriv => inputObjs += functionPriv)
inputObjs ++= functionPrivAndOpType.flatMap(_._1)

case plan => plan transformAllExpressions {
case hiveFunction: Expression if isKnownFunction(hiveFunction) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class AccessResource private (val objectType: ObjectType, val catalog: Option[St
extends RangerAccessResourceImpl {
implicit def asString(obj: Object): String = if (obj != null) obj.asInstanceOf[String] else null
def getDatabase: String = getValue("database")
def getUdf: String = getValue("udf")
def getTable: String = getValue("table")
def getColumn: String = getValue("column")
def getColumns: Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class RangerSparkExtension extends (SparkSessionExtensions => Unit) {

override def apply(v1: SparkSessionExtensions): Unit = {
v1.injectCheckRule(AuthzConfigurationChecker)
v1.injectCheckRule(RuleFunctionAuthorization)
v1.injectResolutionRule(_ => RuleReplaceShowObjectCommands)
v1.injectResolutionRule(_ => RuleApplyPermanentViewMarker)
v1.injectResolutionRule(_ => RuleApplyTypeOfMarker)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.spark.authz.ranger

import scala.collection.mutable

import org.apache.ranger.plugin.policyengine.RangerAccessRequest
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import org.apache.kyuubi.plugin.spark.authz._
import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType
import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._

case class RuleFunctionAuthorization(spark: SparkSession) extends (LogicalPlan => Unit) {
override def apply(plan: LogicalPlan): Unit = {
val auditHandler = new SparkRangerAuditHandler
val ugi = getAuthzUgi(spark.sparkContext)
val (inputs, _, opType) = PrivilegesBuilder.buildFunctions(plan, spark)

// Use a HashSet to deduplicate the same AccessResource and AccessType, the requests will be all
// the non-duplicate requests and in the same order as the input requests.
val requests = new mutable.ArrayBuffer[AccessRequest]()
val requestsSet = new mutable.HashSet[(AccessResource, AccessType)]()

def addAccessRequest(objects: Iterable[PrivilegeObject], isInput: Boolean): Unit = {
objects.foreach { obj =>
val resource = AccessResource(obj, opType)
val accessType = ranger.AccessType(obj, opType, isInput)
if (accessType != AccessType.NONE && !requestsSet.contains((resource, accessType))) {
requests += AccessRequest(resource, ugi, opType, accessType)
requestsSet.add(resource, accessType)
}
}
}

addAccessRequest(inputs, isInput = true)

val requestSeq: Seq[RangerAccessRequest] =
requests.map(_.asInstanceOf[RangerAccessRequest]).toSeq

if (authorizeInSingleCall) {
verify(requestSeq, auditHandler)
} else {
requestSeq.foreach { req =>
verify(Seq(req), auditHandler)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class PolicyJsonFileGenerator extends AnyFunSuite {
policyAccessForPermViewAccessOnly,
policyAccessForTable2AccessOnly,
policyAccessForPaimonNsTable1SelectOnly,
policyAccessForDefaultDbUDF,
// row filter
policyFilterForSrcTableKeyLessThan20,
policyFilterForPermViewKeyLessThan20,
Expand Down Expand Up @@ -371,4 +372,20 @@ class PolicyJsonFileGenerator extends AnyFunSuite {
users = List(table1OnlyUserForNs),
accesses = allowTypes(select),
delegateAdmin = true)))

private val policyAccessForDefaultDbUDF = KRangerPolicy(
name = "defaultdb_udf",
description = "Policy for default db udf",
resources = Map(
databaseRes(defaultDb),
"udf" -> KRangerPolicyResource(values = List("kyuubi_func*"))),
policyItems = List(
KRangerPolicyItem(
users = List(bob),
accesses = allowTypes(select, update, create, drop, alter, index, lock, all, read, write),
delegateAdmin = true),
KRangerPolicyItem(
users = List(kent),
accesses = allowTypes(select),
delegateAdmin = true)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,72 @@
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
"name" : "defaultdb_udf",
"policyType" : 0,
"policyPriority" : 0,
"description" : "Policy for default db udf",
"isAuditEnabled" : true,
"resources" : {
"database" : {
"values" : [ "default" ],
"isExcludes" : false,
"isRecursive" : false
},
"udf" : {
"values" : [ "kyuubi_func*" ],
"isExcludes" : false,
"isRecursive" : false
}
},
"policyItems" : [ {
"accesses" : [ {
"type" : "select",
"isAllowed" : true
}, {
"type" : "update",
"isAllowed" : true
}, {
"type" : "create",
"isAllowed" : true
}, {
"type" : "drop",
"isAllowed" : true
}, {
"type" : "alter",
"isAllowed" : true
}, {
"type" : "index",
"isAllowed" : true
}, {
"type" : "lock",
"isAllowed" : true
}, {
"type" : "all",
"isAllowed" : true
}, {
"type" : "read",
"isAllowed" : true
}, {
"type" : "write",
"isAllowed" : true
} ],
"users" : [ "bob" ],
"delegateAdmin" : true
}, {
"accesses" : [ {
"type" : "select",
"isAllowed" : true
} ],
"users" : [ "kent" ],
"delegateAdmin" : true
} ],
"isDenyAllElse" : false
}, {
"id" : 11,
"guid" : "6512bd43-d9ca-36e0-ac99-0b0a82652dca",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
"name" : "src_key_less_than_20",
"policyType" : 2,
"policyPriority" : 0,
Expand Down Expand Up @@ -539,8 +605,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 11,
"guid" : "6512bd43-d9ca-36e0-ac99-0b0a82652dca",
"id" : 12,
"guid" : "c20ad4d7-6fe9-3759-aa27-a0c99bff6710",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -573,8 +639,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 12,
"guid" : "c20ad4d7-6fe9-3759-aa27-a0c99bff6710",
"id" : 13,
"guid" : "c51ce410-c124-310e-8db5-e4b97fc2af39",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -612,8 +678,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 13,
"guid" : "c51ce410-c124-310e-8db5-e4b97fc2af39",
"id" : 14,
"guid" : "aab32389-22bc-325a-af60-6eb525ffdc56",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -651,8 +717,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 14,
"guid" : "aab32389-22bc-325a-af60-6eb525ffdc56",
"id" : 15,
"guid" : "9bf31c7f-f062-336a-96d3-c8bd1f8f2ff3",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -690,8 +756,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 15,
"guid" : "9bf31c7f-f062-336a-96d3-c8bd1f8f2ff3",
"id" : 16,
"guid" : "c74d97b0-1eae-357e-84aa-9d5bade97baf",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -729,8 +795,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 16,
"guid" : "c74d97b0-1eae-357e-84aa-9d5bade97baf",
"id" : 17,
"guid" : "70efdf2e-c9b0-3607-9795-c442636b55fb",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -768,8 +834,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 17,
"guid" : "70efdf2e-c9b0-3607-9795-c442636b55fb",
"id" : 18,
"guid" : "6f4922f4-5568-361a-8cdf-4ad2299f6d23",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,89 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite
}
}

test("Built in and UDF Function Call Query") {
val plan = sql(
s"""
|SELECT
| kyuubi_fun_0('TESTSTRING') AS col1,
| kyuubi_fun_0(value) AS col2,
| abs(key) AS col3, abs(-100) AS col4,
| lower(value) AS col5,lower('TESTSTRING') AS col6
|FROM $reusedTable
|""".stripMargin).queryExecution.analyzed
val (inputs, _, _) = PrivilegesBuilder.buildFunctions(plan, spark)
assert(inputs.size === 2)
inputs.foreach { po =>
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.dbname startsWith reusedDb.toLowerCase)
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
val accessType = ranger.AccessType(po, QUERY, isInput = true)
assert(accessType === AccessType.SELECT)
}
}

test("Function Call in Create Table/View") {
val plan1 = sql(
s"""
|CREATE TABLE table1 AS
|SELECT
| kyuubi_fun_0('KYUUBI_TESTSTRING'),
| kyuubi_fun_0(value)
|FROM $reusedTable
|""".stripMargin).queryExecution.analyzed
val (inputs1, _, _) = PrivilegesBuilder.buildFunctions(plan1, spark)
assert(inputs1.size === 2)
inputs1.foreach { po =>
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.dbname startsWith reusedDb.toLowerCase)
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
val accessType = ranger.AccessType(po, QUERY, isInput = true)
assert(accessType === AccessType.SELECT)
}
val plan2 = sql("DROP TABLE IF EXISTS table1").queryExecution.analyzed
val (inputs2, _, _) = PrivilegesBuilder.buildFunctions(plan2, spark)
assert(inputs2.size === 0)

val plan3 = sql(
s"""
|CREATE VIEW view1 AS SELECT
| kyuubi_fun_0('KYUUBI_TESTSTRING') AS fun1,
| kyuubi_fun_0(value) AS fun2
|FROM $reusedTable
|""".stripMargin).queryExecution.analyzed
val (inputs3, _, _) = PrivilegesBuilder.buildFunctions(plan3, spark)
assert(inputs3.size === 2)
inputs3.foreach { po =>
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.dbname startsWith reusedDb.toLowerCase)
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
val accessType = ranger.AccessType(po, QUERY, isInput = true)
assert(accessType === AccessType.SELECT)
}
val plan4 = sql("DROP VIEW IF EXISTS view1").queryExecution.analyzed
val (inputs4, _, _) = PrivilegesBuilder.buildFunctions(plan4, spark)
assert(inputs4.size === 0)
}

test("Function Call in INSERT OVERWRITE") {
val plan = sql(
s"""
|INSERT OVERWRITE TABLE $reusedTable
|SELECT key, kyuubi_fun_0(value)
|FROM $reusedPartTable
|""".stripMargin).queryExecution.analyzed
val (inputsUpdate, _, _) = PrivilegesBuilder.buildFunctions(plan, spark)
assert(inputsUpdate.size === 1)
inputsUpdate.foreach { po =>
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.dbname startsWith reusedDb.toLowerCase)
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
val accessType = ranger.AccessType(po, QUERY, isInput = true)
assert(accessType === AccessType.SELECT)
}
}
}
Loading