diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 83485a52175..e2516f75847 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -149,6 +149,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.doAs.enabled | true | Whether to enable user impersonation on launching engine. When enabled, for engines which supports user impersonation, e.g. SPARK, depends on the `kyuubi.engine.share.level`, different users will be used to launch the engine. Otherwise, Kyuubi Server's user will always be used to launch the engine. | boolean | 1.9.0 |
| kyuubi.engine.event.json.log.path | file:///tmp/kyuubi/events | The location where all the engine events go for the built-in JSON logger.
- Local Path: start with 'file://'
- HDFS Path: start with 'hdfs://'
| string | 1.3.0 |
| kyuubi.engine.event.loggers | SPARK | A comma-separated list of engine history loggers, where engine/session/operation etc events go. - SPARK: the events will be written to the Spark listener bus.
- JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
- JDBC: to be done
- CUSTOM: User-defined event handlers.
Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a subclass of `org.apache.kyuubi.events.handler.CustomEventHandlerProvider` which has a zero-arg constructor. | seq | 1.3.0 |
+| kyuubi.engine.external.token.enabled | false | Whether to start Kerberized engine with external delegation tokens. | boolean | 1.11.0 |
| kyuubi.engine.flink.application.jars | <undefined> | A comma-separated list of the local jars to be shipped with the job to the cluster. For example, SQL UDF jars. Only effective in yarn application mode. | string | 1.8.0 |
| kyuubi.engine.flink.doAs.enabled | false | When enabled, the session user is used as the proxy user to launch the Flink engine, otherwise, the server user. Note, due to the limitation of Apache Flink, it can only be enabled on Kerberized environment. | boolean | 1.10.0 |
| kyuubi.engine.flink.extra.classpath | <undefined> | The extra classpath for the Flink SQL engine, for configuring the location of hadoop client jars, etc. Only effective in yarn session mode. | string | 1.6.0 |
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 06c8e7a9d52..ee007a8a94a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2390,6 +2390,13 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)
+ val ENGINE_EXTERNAL_TOKEN_ENABLED: ConfigEntry[Boolean] =
+ buildConf("kyuubi.engine.external.token.enabled")
+ .doc("Whether to start Kerberized engine with external delegation tokens.")
+ .version("1.11.0")
+ .booleanConf
+ .createWithDefault(false)
+
val ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("kyuubi.engine.share.level")
.doc("Engines will be shared in different levels, available configs are: " +
" - CONNECTION: the engine will not be shared but only used by the current client" +
diff --git a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider b/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
index 95d6e1987fa..983867ae9c8 100644
--- a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
+++ b/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
@@ -17,3 +17,4 @@
org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider
org.apache.kyuubi.credentials.HiveDelegationTokenProvider
+org.apache.kyuubi.credentials.YarnRMDelegationTokenProvider
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/YarnRMDelegationTokenProvider.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/YarnRMDelegationTokenProvider.scala
new file mode 100644
index 00000000000..5f54b9a595d
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/YarnRMDelegationTokenProvider.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, SecurityUtil}
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
+import org.apache.hadoop.yarn.client.ClientRMProxy
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_EXTERNAL_TOKEN_ENABLED
+import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.doAsProxyUser
+
+class YarnRMDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {
+ private var yarnConf: YarnConfiguration = _
+ private var tokenService: Text = _
+ private var required = false
+ override def serviceName: String = "yarn"
+
+ def getTokenService(): Text = tokenService
+
+ // Only support engine and kyuubi server using same hadoop conf
+ override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Unit = {
+ if (SecurityUtil.getAuthenticationMethod(hadoopConf) != AuthenticationMethod.SIMPLE) {
+ yarnConf = new YarnConfiguration(hadoopConf)
+ tokenService = ClientRMProxy.getRMDelegationTokenService(yarnConf)
+ required = kyuubiConf.get(ENGINE_EXTERNAL_TOKEN_ENABLED)
+ }
+ }
+
+ override def delegationTokensRequired(): Boolean = required
+
+ override def obtainDelegationTokens(owner: String, creds: Credentials): Unit = {
+ doAsProxyUser(owner) {
+ var client: Option[YarnClient] = None
+ try {
+ client = Some(YarnClient.createYarnClient())
+ client.foreach(client => {
+ client.init(yarnConf)
+ client.start()
+ val yarnToken = ConverterUtils.convertFromYarn(
+ client.getRMDelegationToken(new Text()),
+ tokenService)
+ info(s"Get Token from Resource Manager service ${tokenService}, " +
+ s"token : ${yarnToken.toString}")
+ creds.addToken(new Text(yarnToken.getService), yarnToken)
+ })
+ } catch {
+ case e: Throwable => error("Error occurs when get delegation token", e)
+ } finally {
+ client.foreach(_.close())
+ }
+ }
+ }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index a2cd268a6cb..6aa6b5a71c4 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -27,14 +27,16 @@ import scala.collection.JavaConverters._
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
+import org.apache.hadoop.conf.Configuration
import org.apache.kyuubi._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_HOME
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.util.{JavaUtils, NamedThreadFactory}
+import org.apache.kyuubi.util.{JavaUtils, KyuubiHadoopUtils, NamedThreadFactory}
-trait ProcBuilder {
+trait ProcBuilder extends Logging {
import ProcBuilder._
@@ -168,6 +170,7 @@ trait ProcBuilder {
private var logCaptureThread: Thread = _
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = false
+ @volatile private[kyuubi] var tokenTempDir: java.nio.file.Path = _
// Set engine application manger info conf
conf.set(
@@ -270,6 +273,14 @@ trait ProcBuilder {
Utils.terminateProcess(process, engineStartupDestroyTimeout)
process = null
}
+ if (tokenTempDir != null) {
+ try {
+ Utils.deleteDirectoryRecursively(tokenTempDir.toFile)
+ } catch {
+ case e: Throwable =>
+ error(s"Error deleting token temp dir: $tokenTempDir", e)
+ }
+ }
}
def getError: Throwable = synchronized {
@@ -359,6 +370,19 @@ trait ProcBuilder {
def waitEngineCompletion: Boolean = {
!isClusterMode() || conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
}
+
+ def generateEngineTokenFile: Option[String] = {
+ conf.getOption(KYUUBI_ENGINE_CREDENTIALS_KEY).map { encodedCredentials =>
+ val credentials = KyuubiHadoopUtils.decodeCredentials(encodedCredentials)
+ tokenTempDir = Utils.createTempDir()
+ val file = s"${tokenTempDir.toString}/kyuubi_credentials_${System.currentTimeMillis()}"
+ credentials.writeTokenStorageFile(
+ new org.apache.hadoop.fs.Path(s"file://$file"),
+ new Configuration())
+ info(s"Generated hadoop token file: $file")
+ file
+ }
+ }
}
object ProcBuilder extends Logging {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index b83089bc352..df481873756 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -24,18 +24,15 @@ import scala.collection.mutable
import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_USER_KEY}
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.util.KyuubiHadoopUtils
import org.apache.kyuubi.util.command.CommandLineUtils._
/**
@@ -47,7 +44,7 @@ class FlinkProcessBuilder(
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
- extends ProcBuilder with Logging {
+ extends ProcBuilder {
@VisibleForTesting
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
@@ -276,21 +273,13 @@ class FlinkProcessBuilder(
}
}
- @volatile private var tokenTempDir: java.nio.file.Path = _
private def generateTokenFile(): Option[(String, String)] = {
if (conf.get(ENGINE_FLINK_DOAS_GENERATE_TOKEN_FILE)) {
// We disabled `hadoopfs` token service, which may cause yarn client to miss hdfs token.
// So we generate a hadoop token file to pass kyuubi engine tokens to submit process.
// TODO: Removed this after FLINK-35525 (1.20.0), delegation tokens will be passed
// by `kyuubi` provider
- conf.getOption(KYUUBI_ENGINE_CREDENTIALS_KEY).map { encodedCredentials =>
- val credentials = KyuubiHadoopUtils.decodeCredentials(encodedCredentials)
- tokenTempDir = Utils.createTempDir()
- val file = s"${tokenTempDir.toString}/kyuubi_credentials_${System.currentTimeMillis()}"
- credentials.writeTokenStorageFile(new Path(s"file://$file"), new Configuration())
- info(s"Generated hadoop token file: $file")
- "HADOOP_TOKEN_FILE_LOCATION" -> file
- }
+ generateEngineTokenFile.map(tokenFile => "HADOOP_TOKEN_FILE_LOCATION" -> tokenFile)
} else {
None
}
@@ -298,13 +287,6 @@ class FlinkProcessBuilder(
override def close(destroyProcess: Boolean): Unit = {
super.close(destroyProcess)
- if (tokenTempDir != null) {
- try {
- Utils.deleteDirectoryRecursively(tokenTempDir.toFile)
- } catch {
- case e: Throwable => error(s"Error deleting token temp dir: $tokenTempDir", e)
- }
- }
}
override def shortName: String = "flink"
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 05a09e0293e..30d33c23b4a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -51,7 +51,7 @@ class SparkProcessBuilder(
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
- extends ProcBuilder with Logging {
+ extends ProcBuilder {
@VisibleForTesting
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
@@ -61,6 +61,7 @@ class SparkProcessBuilder(
import SparkProcessBuilder._
private[kyuubi] val sparkHome = getEngineHome(shortName)
+ private[kyuubi] val externalTokensEnabled = conf.get(ENGINE_EXTERNAL_TOKEN_ENABLED)
override protected val executable: String = {
Paths.get(sparkHome, "bin", SPARK_SUBMIT_FILE).toFile.getCanonicalPath
@@ -68,6 +69,16 @@ class SparkProcessBuilder(
override def mainClass: String = "org.apache.kyuubi.engine.spark.SparkSQLEngine"
+ override def env: Map[String, String] = {
+ val extraEnvs: Map[String, String] =
+ if ((conf.getOption(PRINCIPAL).isEmpty || conf.getOption(KEYTAB).isEmpty)
+ && doAsEnabled && externalTokensEnabled) {
+ Map(ENV_KERBEROS_TGT -> "", ENV_SPARK_PROXY_USER -> proxyUser) ++
+ generateEngineTokenFile.map(tokenFile => HADOOP_TOKEN_FILE_LOCATION -> tokenFile)
+ } else Map.empty
+ conf.getEnvs ++ extraEnvs
+ }
+
/**
* Add `spark.master` if KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT
* are defined. So we can deploy spark on kubernetes without setting `spark.master`
@@ -169,8 +180,10 @@ class SparkProcessBuilder(
tryKeytab() match {
case None if doAsEnabled =>
setSparkUserName(proxyUser, buffer)
- buffer += PROXY_USER
- buffer += proxyUser
+ if (!externalTokensEnabled) {
+ buffer += PROXY_USER
+ buffer += proxyUser
+ }
case None => // doAs disabled
setSparkUserName(Utils.currentUser, buffer)
case Some(name) =>
@@ -409,6 +422,9 @@ object SparkProcessBuilder {
final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts"
final val YARN_SUBMIT_WAIT_APP_COMPLETION = "spark.yarn.submit.waitAppCompletion"
final val INTERNAL_RESOURCE = "spark-internal"
+ final val HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
+ final val ENV_KERBEROS_TGT = "KRB5CCNAME"
+ final val ENV_SPARK_PROXY_USER = "HADOOP_PROXY_USER"
final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path"
final val KUBERNETES_UPLOAD_PATH_PERMISSION =
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredYarnCluster.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredYarnCluster.scala
new file mode 100644
index 00000000000..aab1c4435d7
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredYarnCluster.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.MiniYarnService
+
+trait WithSecuredYarnCluster extends KerberizedTestHelper {
+
+ private var miniYarnService: MiniYarnService = _
+
+ private def newSecuredConf(): Configuration = {
+ val hdfsConf = new Configuration()
+ hdfsConf.set("ignore.secure.ports.for.testing", "true")
+ hdfsConf.set("hadoop.security.authentication", "kerberos")
+ hdfsConf.set("yarn.resourcemanager.keytab", testKeytab)
+ hdfsConf.set("yarn.resourcemanager.principal", testPrincipal)
+
+ hdfsConf.set("yarn.nodemanager.keytab", testPrincipal)
+ hdfsConf.set("yarn.nodemanager.principal", testKeytab)
+
+ hdfsConf
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ tryWithSecurityEnabled {
+ UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
+ miniYarnService = new MiniYarnService(newSecuredConf())
+ miniYarnService.initialize(new KyuubiConf(false))
+ miniYarnService.start()
+ }
+ }
+
+ override def afterAll(): Unit = {
+ miniYarnService.stop()
+ super.afterAll()
+ }
+
+ def getHadoopConf: Configuration = miniYarnService.getYarnConf
+ def getHadoopConfDir: String = miniYarnService.getYarnConfDir
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/YarnDelegationTokenProviderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/YarnDelegationTokenProviderSuite.scala
new file mode 100644
index 00000000000..a045dbbc135
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/YarnDelegationTokenProviderSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier
+
+import org.apache.kyuubi.WithSecuredYarnCluster
+import org.apache.kyuubi.config.KyuubiConf
+
+class YarnDelegationTokenProviderSuite extends WithSecuredYarnCluster {
+
+ test("obtain yarn rm delegation tokens") {
+ tryWithSecurityEnabled {
+ UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
+
+ val hadoopConf = getHadoopConf
+ val kyuubiConf = new KyuubiConf(false)
+ kyuubiConf.set("kyuubi.engine.external.token.enabled", "true")
+
+ val provider = new YarnRMDelegationTokenProvider
+ provider.initialize(hadoopConf, kyuubiConf)
+ assert(provider.delegationTokensRequired())
+
+ val owner = "who"
+ val credentials = new Credentials()
+ provider.obtainDelegationTokens(owner, credentials)
+
+ val token = credentials
+ .getToken(provider.getTokenService())
+ .asInstanceOf[Token[AbstractDelegationTokenIdentifier]]
+ assert(token != null)
+
+ val tokenIdent = token.decodeIdentifier()
+ assertResult(RMDelegationTokenIdentifier.KIND_NAME)(token.getKind)
+ assertResult(new Text(owner))(tokenIdent.getOwner)
+ val currentUserName = UserGroupInformation.getCurrentUser.getUserName
+ assertResult(new Text(currentUserName))(tokenIdent.getRealUser)
+ }
+ }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 3975b3780dc..2a3d3f72efa 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -30,8 +30,9 @@ import org.scalatestplus.mockito.MockitoSugar
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
-import org.apache.kyuubi.engine.spark.SparkProcessBuilder._
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder.{PROXY_USER, _}
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.service.ServiceUtils
@@ -484,6 +485,20 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
val toady = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())
assert(commands1.contains(s"spark.kubernetes.file.upload.path=hdfs:///spark-upload-$toady"))
}
+
+ test("spark engine with external token file") {
+ val conf1 = KyuubiConf(false)
+ val tokenStr = "SERUUwABDzEyNy4wLjAuMTo0NTQ2MkIKA3dobxopY2xpZW50L2xvY2FsaG9zdEBLRVJC" +
+ "RVJJWkVEVEVTVEhFTFBFUi5DT00guMuGgJEzKLjTuKCTMzABOAIUqTa2O5pYh2dBXFNnpqEgIJvWF5sTU" +
+ "k1fREVMRUdBVElPTl9UT0tFTg8xMjcuMC4wLjE6NDU0NjIA"
+ conf1.set(KYUUBI_ENGINE_CREDENTIALS_KEY, tokenStr)
+ conf1.set(ENGINE_EXTERNAL_TOKEN_ENABLED, true)
+ val builder1 = new SparkProcessBuilder("", true, conf1)
+ assert(builder1.env.contains(HADOOP_TOKEN_FILE_LOCATION))
+ assert(builder1.env.contains(ENV_KERBEROS_TGT) &&
+ builder1.env(ENV_KERBEROS_TGT).isEmpty)
+ assert(builder1.commands.forall(e => !e.contains(PROXY_USER)))
+ }
}
class FakeSparkProcessBuilder(config: KyuubiConf)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
index deaeae3bed1..dc8d28e6857 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
@@ -31,11 +31,12 @@ import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.AbstractService
-class MiniYarnService extends AbstractService("TestMiniYarnService") {
+class MiniYarnService(configuration: Configuration = new Configuration())
+ extends AbstractService("TestMiniYarnService") {
private val yarnConfDir: File = Utils.createTempDir().toFile
private var yarnConf: YarnConfiguration = {
- val yarnConfig = new YarnConfiguration()
+ val yarnConfig = new YarnConfiguration(configuration)
// Disable the disk utilization check to avoid the test hanging when people's disks are
// getting full.
yarnConfig.set(