diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/config/AuthzConfigurationChecker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/config/AuthzConfigurationChecker.scala index 3ab2c3fd640..0daac2baacd 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/config/AuthzConfigurationChecker.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/config/AuthzConfigurationChecker.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.plugin.spark.authz.rule.config +import org.apache.spark.authz.AuthzConf._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.SetCommand @@ -28,11 +29,14 @@ import org.apache.kyuubi.plugin.spark.authz.AccessControlException */ case class AuthzConfigurationChecker(spark: SparkSession) extends (LogicalPlan => Unit) { - final val RESTRICT_LIST_KEY = "spark.kyuubi.conf.restricted.list" - private val restrictedConfList: Set[String] = - Set(RESTRICT_LIST_KEY, "spark.sql.runSQLOnFiles", "spark.sql.extensions") ++ - spark.conf.getOption(RESTRICT_LIST_KEY).map(_.split(',').toSet).getOrElse(Set.empty) + Set( + CONF_RESTRICTED_LIST.key, + DATA_MASKING_ENABLED.key, + ROW_FILTER_ENABLED.key, + "spark.sql.runSQLOnFiles", + "spark.sql.extensions") ++ + confRestrictedList(spark.sparkContext.getConf).map(_.split(',').toSet).getOrElse(Set.empty) override def apply(plan: LogicalPlan): Unit = plan match { case SetCommand(Some(( diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage0.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage0.scala index 27cde162113..7295481a8fd 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage0.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage0.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.plugin.spark.authz.rule.datamasking +import org.apache.spark.authz.AuthzConf.dataMaskingEnabled import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} @@ -45,15 +46,19 @@ import org.apache.kyuubi.plugin.spark.authz.serde._ case class RuleApplyDataMaskingStage0(spark: SparkSession) extends RuleHelper { override def apply(plan: LogicalPlan): LogicalPlan = { - val newPlan = mapChildren(plan) { - case p: DataMaskingStage0Marker => p - case p: DataMaskingStage1Marker => p - case scan if isKnownScan(scan) && scan.resolved => - val tables = getScanSpec(scan).tables(scan, spark) - tables.headOption.map(applyMasking(scan, _)).getOrElse(scan) - case other => apply(other) + if (!dataMaskingEnabled(conf)) { + plan + } else { + val newPlan = mapChildren(plan) { + case p: DataMaskingStage0Marker => p + case p: DataMaskingStage1Marker => p + case scan if isKnownScan(scan) && scan.resolved => + val tables = getScanSpec(scan).tables(scan, spark) + tables.headOption.map(applyMasking(scan, _)).getOrElse(scan) + case other => apply(other) + } + newPlan } - newPlan } private def applyMasking( diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage1.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage1.scala index b0069c9a543..86d57e180e9 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage1.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage1.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.plugin.spark.authz.rule.datamasking +import org.apache.spark.authz.AuthzConf.dataMaskingEnabled import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} @@ -33,25 +34,28 @@ import org.apache.kyuubi.plugin.spark.authz.serde._ case class RuleApplyDataMaskingStage1(spark: SparkSession) extends RuleHelper { override def apply(plan: LogicalPlan): LogicalPlan = { - - plan match { - case marker0: DataMaskingStage0Marker => marker0 - case marker1: DataMaskingStage1Marker => marker1 - case cmd if isKnownTableCommand(cmd) => - val tableCommandSpec = getTableCommandSpec(cmd) - val queries = tableCommandSpec.queries(cmd) - cmd.mapChildren { - case marker0: DataMaskingStage0Marker => marker0 - case marker1: DataMaskingStage1Marker => marker1 - case query if queries.contains(query) && query.resolved => - applyDataMasking(query) - case o => o - } - case cmd: Command if cmd.childrenResolved => - cmd.mapChildren(applyDataMasking) - case cmd: Command => cmd - case other if other.resolved => applyDataMasking(other) - case other => other + if (!dataMaskingEnabled(conf)) { + plan + } else { + plan match { + case marker0: DataMaskingStage0Marker => marker0 + case marker1: DataMaskingStage1Marker => marker1 + case cmd if isKnownTableCommand(cmd) => + val tableCommandSpec = getTableCommandSpec(cmd) + val queries = tableCommandSpec.queries(cmd) + cmd.mapChildren { + case marker0: DataMaskingStage0Marker => marker0 + case marker1: DataMaskingStage1Marker => marker1 + case query if queries.contains(query) && query.resolved => + applyDataMasking(query) + case o => o + } + case cmd: Command if cmd.childrenResolved => + cmd.mapChildren(applyDataMasking) + case cmd: Command => cmd + case other if other.resolved => applyDataMasking(other) + case other => other + } } } diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/RuleApplyRowFilter.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/RuleApplyRowFilter.scala index defee4005b6..8e9ae67a653 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/RuleApplyRowFilter.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/RuleApplyRowFilter.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.plugin.spark.authz.rule.rowfilter +import org.apache.spark.authz.AuthzConf.rowFilterEnabled import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} @@ -29,14 +30,18 @@ import org.apache.kyuubi.plugin.spark.authz.serde._ case class RuleApplyRowFilter(spark: SparkSession) extends RuleHelper { override def apply(plan: LogicalPlan): LogicalPlan = { - val newPlan = mapChildren(plan) { - case p: RowFilterMarker => p - case scan if isKnownScan(scan) && scan.resolved => - val tables = getScanSpec(scan).tables(scan, spark) - tables.headOption.map(applyFilter(scan, _)).getOrElse(scan) - case other => apply(other) + if (!rowFilterEnabled(conf)) { + plan + } else { + val newPlan = mapChildren(plan) { + case p: RowFilterMarker => p + case scan if isKnownScan(scan) && scan.resolved => + val tables = getScanSpec(scan).tables(scan, spark) + tables.headOption.map(applyFilter(scan, _)).getOrElse(scan) + case other => apply(other) + } + newPlan } - newPlan } private def applyFilter( diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/spark/authz/AuthzConf.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/spark/authz/AuthzConf.scala new file mode 100644 index 00000000000..6430812b48f --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/spark/authz/AuthzConf.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.authz + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.buildConf + +object AuthzConf { + val CONF_RESTRICTED_LIST = + ConfigBuilder("spark.kyuubi.conf.restricted.list") + .doc("The config key in the restricted list cannot set dynamic configuration via SET syntax.") + .version("1.7.0") + .stringConf + .createOptional + + val DATA_MASKING_ENABLED = + buildConf("spark.sql.authz.dataMasking.enabled") + .doc("Whether to enable data masking rule for authz plugin.") + .version("1.11.0") + .booleanConf + .createWithDefault(true) + + val ROW_FILTER_ENABLED = + buildConf("spark.sql.authz.rowFilter.enabled") + .doc("Whether to enable row filter rule for authz plugin.") + .version("1.11.0") + .booleanConf + .createWithDefault(true) + + def confRestrictedList(conf: SparkConf): Option[String] = { + conf.get(CONF_RESTRICTED_LIST) + } + + def dataMaskingEnabled(conf: SQLConf): Boolean = { + conf.getConf(DATA_MASKING_ENABLED) + } + + def rowFilterEnabled(conf: SQLConf): Boolean = { + conf.getConf(ROW_FILTER_ENABLED) + } +} diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingTestBase.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingTestBase.scala index d8877b7f9c8..47d66d73c3f 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingTestBase.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingTestBase.scala @@ -23,7 +23,10 @@ import scala.util.Try // scalastyle:off import org.apache.commons.codec.digest.DigestUtils.md5Hex +import org.apache.spark.authz.AuthzConf +import org.apache.spark.authz.AuthzConf.DATA_MASKING_ENABLED import org.apache.spark.sql.{Row, SparkSessionExtensions} +import org.apache.spark.sql.internal.SQLConf import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite @@ -85,6 +88,17 @@ trait DataMaskingTestBase extends AnyFunSuite with SparkSessionProvider with Bef super.afterAll() } + private def withEnabledDataMasking(enabled: Boolean)(f: => Unit): Unit = { + val conf = SQLConf.get + val oldValue = AuthzConf.dataMaskingEnabled(conf) + try { + conf.setConf(DATA_MASKING_ENABLED, enabled) + f + } finally { + conf.setConf(DATA_MASKING_ENABLED, oldValue) + } + } + test("simple query with a user doesn't have mask rules") { checkAnswer( kent, @@ -93,17 +107,24 @@ trait DataMaskingTestBase extends AnyFunSuite with SparkSessionProvider with Bef } test("simple query with a user has mask rules") { - val result = - Seq(Row(md5Hex("1"), "xxxxx", "worlx", Timestamp.valueOf("2018-01-01 00:00:00"), "Xorld")) - checkAnswer( - bob, - "SELECT value1, value2, value3, value4, value5 FROM default.src " + - "where key = 1", - result) - checkAnswer( - bob, - "SELECT value1 as key, value2, value3, value4, value5 FROM default.src where key = 1", - result) + Seq(true, false).foreach { enabled => + withEnabledDataMasking(enabled) { + val result: Seq[Row] = if (enabled) { + Seq(Row(md5Hex("1"), "xxxxx", "worlx", Timestamp.valueOf("2018-01-01 00:00:00"), "Xorld")) + } else { + Seq(Row(1, "hello", "world", Timestamp.valueOf("2018-11-17 12:34:56"), "World")) + } + checkAnswer( + bob, + "SELECT value1, value2, value3, value4, value5 FROM default.src " + + "where key = 1", + result) + checkAnswer( + bob, + "SELECT value1 as key, value2, value3, value4, value5 FROM default.src where key = 1", + result) + } + } } test("star") { diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/rowfiltering/RowFilteringTestBase.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/rowfiltering/RowFilteringTestBase.scala index 3d0890d1967..f914056e94d 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/rowfiltering/RowFilteringTestBase.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/rowfiltering/RowFilteringTestBase.scala @@ -20,7 +20,10 @@ package org.apache.kyuubi.plugin.spark.authz.ranger.rowfiltering // scalastyle:off import scala.util.Try +import org.apache.spark.authz.AuthzConf +import org.apache.spark.authz.AuthzConf.ROW_FILTER_ENABLED import org.apache.spark.sql.{Row, SparkSessionExtensions} +import org.apache.spark.sql.internal.SQLConf import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite @@ -57,19 +60,48 @@ trait RowFilteringTestBase extends AnyFunSuite with SparkSessionProvider with Be super.afterAll() } + private def withEnabledRowFilter(enabled: Boolean)(f: => Unit): Unit = { + val conf = SQLConf.get + val oldValue = AuthzConf.rowFilterEnabled(conf) + try { + conf.setConf(ROW_FILTER_ENABLED, enabled) + f + } finally { + conf.setConf(ROW_FILTER_ENABLED, oldValue) + } + } + test("user without row filtering rule") { checkAnswer( kent, - "SELECT key FROM default.src order order by key", + "SELECT key FROM default.src order by key", Seq(Row(1), Row(20), Row(30))) } test("simple query projecting filtering column") { - checkAnswer(bob, "SELECT key FROM default.src", Seq(Row(1))) + Seq(true, false).foreach { enabled => + withEnabledRowFilter(enabled) { + val result = if (enabled) { + Seq(Row(1)) + } else { + Seq(Row(1), Row(20), Row(30)) + } + checkAnswer(bob, "SELECT key FROM default.src order by key", result) + } + } } test("simple query projecting non filtering column") { - checkAnswer(bob, "SELECT value FROM default.src", Seq(Row(1))) + Seq(true, false).foreach { enabled => + withEnabledRowFilter(enabled) { + val result = if (enabled) { + Seq(Row(1)) + } else { + Seq(Row(1), Row(2), Row(3)) + } + checkAnswer(bob, "SELECT value FROM default.src order by key", result) + } + } } test("simple query projecting non filtering column with udf max") { diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/rule/AuthzConfigurationCheckerSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/rule/AuthzConfigurationCheckerSuite.scala index 10fa0af9e1c..3bb5a88d834 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/rule/AuthzConfigurationCheckerSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/rule/AuthzConfigurationCheckerSuite.scala @@ -17,6 +17,9 @@ package org.apache.kyuubi.plugin.spark.authz.rule +import org.apache.spark.SparkConf +import org.apache.spark.authz.AuthzConf.CONF_RESTRICTED_LIST +import org.apache.spark.sql.AnalysisException import org.scalatest.BeforeAndAfterAll // scalastyle:off import org.scalatest.funsuite.AnyFunSuite @@ -29,13 +32,16 @@ class AuthzConfigurationCheckerSuite extends AnyFunSuite with SparkSessionProvid with BeforeAndAfterAll { override protected val catalogImpl: String = "in-memory" + + override protected val extraSparkConf: SparkConf = new SparkConf() + .set(CONF_RESTRICTED_LIST.key, "spark.sql.abc,spark.sql.xyz") + override def afterAll(): Unit = { spark.stop() super.afterAll() } test("apply spark configuration restriction rules") { - sql("set spark.kyuubi.conf.restricted.list=spark.sql.abc,spark.sql.xyz") val extension = AuthzConfigurationChecker(spark) val p1 = sql("set spark.sql.runSQLOnFiles=true").queryExecution.analyzed intercept[AccessControlException](extension.apply(p1)) @@ -47,8 +53,11 @@ class AuthzConfigurationCheckerSuite extends AnyFunSuite with SparkSessionProvid intercept[AccessControlException](extension.apply(p4)) val p5 = sql("set spark.sql.xyz=abc").queryExecution.analyzed intercept[AccessControlException](extension.apply(p5)) - val p6 = sql("set spark.kyuubi.conf.restricted.list=123").queryExecution.analyzed - intercept[AccessControlException](extension.apply(p6)) + val e = intercept[AnalysisException] { + sql("set spark.kyuubi.conf.restricted.list=123") + } + assert(e.getMessage.contains("Cannot modify the value of") && e.getMessage.contains( + CONF_RESTRICTED_LIST.key)) val p7 = sql("set spark.sql.efg=hijk").queryExecution.analyzed extension.apply(p7) val p8 = sql(