Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.doAs.enabled | true | Whether to enable user impersonation on launching engine. When enabled, for engines which supports user impersonation, e.g. SPARK, depends on the `kyuubi.engine.share.level`, different users will be used to launch the engine. Otherwise, Kyuubi Server's user will always be used to launch the engine. | boolean | 1.9.0 |
| kyuubi.engine.event.json.log.path | file:///tmp/kyuubi/events | The location where all the engine events go for the built-in JSON logger.<ul><li>Local Path: start with 'file://'</li><li>HDFS Path: start with 'hdfs://'</li></ul> | string | 1.3.0 |
| kyuubi.engine.event.loggers | SPARK | A comma-separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>SPARK: the events will be written to the Spark listener bus.</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: User-defined event handlers.</li></ul> Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a subclass of `org.apache.kyuubi.events.handler.CustomEventHandlerProvider` which has a zero-arg constructor. | seq | 1.3.0 |
| kyuubi.engine.external.token.enabled | false | Whether to start Kerberized engine with external delegation tokens. | boolean | 1.11.0 |
| kyuubi.engine.flink.application.jars | &lt;undefined&gt; | A comma-separated list of the local jars to be shipped with the job to the cluster. For example, SQL UDF jars. Only effective in yarn application mode. | string | 1.8.0 |
| kyuubi.engine.flink.doAs.enabled | false | When enabled, the session user is used as the proxy user to launch the Flink engine, otherwise, the server user. Note, due to the limitation of Apache Flink, it can only be enabled on Kerberized environment. | boolean | 1.10.0 |
| kyuubi.engine.flink.extra.classpath | &lt;undefined&gt; | The extra classpath for the Flink SQL engine, for configuring the location of hadoop client jars, etc. Only effective in yarn session mode. | string | 1.6.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2390,6 +2390,13 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)

val ENGINE_EXTERNAL_TOKEN_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.external.token.enabled")
.doc("Whether to start Kerberized engine with external delegation tokens.")
.version("1.11.0")
.booleanConf
.createWithDefault(false)

val ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("kyuubi.engine.share.level")
.doc("Engines will be shared in different levels, available configs are: <ul>" +
" <li>CONNECTION: the engine will not be shared but only used by the current client" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@

org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider
org.apache.kyuubi.credentials.HiveDelegationTokenProvider
org.apache.kyuubi.credentials.YarnRMDelegationTokenProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.credentials

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, SecurityUtil}
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.hadoop.yarn.client.ClientRMProxy
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_EXTERNAL_TOKEN_ENABLED
import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.doAsProxyUser

class YarnRMDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {
private var yarnConf: YarnConfiguration = _
private var tokenService: Text = _
private var required = false
override def serviceName: String = "yarn"

def getTokenService(): Text = tokenService

// Only support engine and kyuubi server using same hadoop conf
override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Unit = {
if (SecurityUtil.getAuthenticationMethod(hadoopConf) != AuthenticationMethod.SIMPLE) {
yarnConf = new YarnConfiguration(hadoopConf)
tokenService = ClientRMProxy.getRMDelegationTokenService(yarnConf)
required = kyuubiConf.get(ENGINE_EXTERNAL_TOKEN_ENABLED)
}
}

override def delegationTokensRequired(): Boolean = required

override def obtainDelegationTokens(owner: String, creds: Credentials): Unit = {
doAsProxyUser(owner) {
var client: Option[YarnClient] = None
try {
client = Some(YarnClient.createYarnClient())
client.foreach(client => {
client.init(yarnConf)
client.start()
val yarnToken = ConverterUtils.convertFromYarn(
client.getRMDelegationToken(new Text()),
tokenService)
info(s"Get Token from Resource Manager service ${tokenService}, " +
s"token : ${yarnToken.toString}")
creds.addToken(new Text(yarnToken.getService), yarnToken)
})
} catch {
case e: Throwable => error("Error occurs when get delegation token", e)
} finally {
client.foreach(_.close())
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ import scala.collection.JavaConverters._
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
import org.apache.hadoop.conf.Configuration

import org.apache.kyuubi._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_HOME
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.{JavaUtils, NamedThreadFactory}
import org.apache.kyuubi.util.{JavaUtils, KyuubiHadoopUtils, NamedThreadFactory}

trait ProcBuilder {
trait ProcBuilder extends Logging {

import ProcBuilder._

Expand Down Expand Up @@ -168,6 +170,7 @@ trait ProcBuilder {
private var logCaptureThread: Thread = _
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = false
@volatile private[kyuubi] var tokenTempDir: java.nio.file.Path = _

// Set engine application manger info conf
conf.set(
Expand Down Expand Up @@ -270,6 +273,14 @@ trait ProcBuilder {
Utils.terminateProcess(process, engineStartupDestroyTimeout)
process = null
}
if (tokenTempDir != null) {
try {
Utils.deleteDirectoryRecursively(tokenTempDir.toFile)
} catch {
case e: Throwable =>
error(s"Error deleting token temp dir: $tokenTempDir", e)
}
}
}

def getError: Throwable = synchronized {
Expand Down Expand Up @@ -359,6 +370,19 @@ trait ProcBuilder {
def waitEngineCompletion: Boolean = {
!isClusterMode() || conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
}

def generateEngineTokenFile: Option[String] = {
conf.getOption(KYUUBI_ENGINE_CREDENTIALS_KEY).map { encodedCredentials =>
val credentials = KyuubiHadoopUtils.decodeCredentials(encodedCredentials)
tokenTempDir = Utils.createTempDir()
val file = s"${tokenTempDir.toString}/kyuubi_credentials_${System.currentTimeMillis()}"
credentials.writeTokenStorageFile(
new org.apache.hadoop.fs.Path(s"file://$file"),
new Configuration())
info(s"Generated hadoop token file: $file")
file
}
}
}

object ProcBuilder extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,15 @@ import scala.collection.mutable

import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.KyuubiHadoopUtils
import org.apache.kyuubi.util.command.CommandLineUtils._

/**
Expand All @@ -47,7 +44,7 @@ class FlinkProcessBuilder(
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {
extends ProcBuilder {

@VisibleForTesting
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
Expand Down Expand Up @@ -276,35 +273,20 @@ class FlinkProcessBuilder(
}
}

@volatile private var tokenTempDir: java.nio.file.Path = _
private def generateTokenFile(): Option[(String, String)] = {
if (conf.get(ENGINE_FLINK_DOAS_GENERATE_TOKEN_FILE)) {
// We disabled `hadoopfs` token service, which may cause yarn client to miss hdfs token.
// So we generate a hadoop token file to pass kyuubi engine tokens to submit process.
// TODO: Removed this after FLINK-35525 (1.20.0), delegation tokens will be passed
// by `kyuubi` provider
conf.getOption(KYUUBI_ENGINE_CREDENTIALS_KEY).map { encodedCredentials =>
val credentials = KyuubiHadoopUtils.decodeCredentials(encodedCredentials)
tokenTempDir = Utils.createTempDir()
val file = s"${tokenTempDir.toString}/kyuubi_credentials_${System.currentTimeMillis()}"
credentials.writeTokenStorageFile(new Path(s"file://$file"), new Configuration())
info(s"Generated hadoop token file: $file")
"HADOOP_TOKEN_FILE_LOCATION" -> file
}
generateEngineTokenFile.map(tokenFile => "HADOOP_TOKEN_FILE_LOCATION" -> tokenFile)
} else {
None
}
}

override def close(destroyProcess: Boolean): Unit = {
super.close(destroyProcess)
if (tokenTempDir != null) {
try {
Utils.deleteDirectoryRecursively(tokenTempDir.toFile)
} catch {
case e: Throwable => error(s"Error deleting token temp dir: $tokenTempDir", e)
}
}
}

override def shortName: String = "flink"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class SparkProcessBuilder(
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {
extends ProcBuilder {

@VisibleForTesting
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
Expand All @@ -61,13 +61,24 @@ class SparkProcessBuilder(
import SparkProcessBuilder._

private[kyuubi] val sparkHome = getEngineHome(shortName)
private[kyuubi] val externalTokensEnabled = conf.get(ENGINE_EXTERNAL_TOKEN_ENABLED)

override protected val executable: String = {
Paths.get(sparkHome, "bin", SPARK_SUBMIT_FILE).toFile.getCanonicalPath
}

override def mainClass: String = "org.apache.kyuubi.engine.spark.SparkSQLEngine"

override def env: Map[String, String] = {
val extraEnvs: Map[String, String] =
if ((conf.getOption(PRINCIPAL).isEmpty || conf.getOption(KEYTAB).isEmpty)
&& doAsEnabled && externalTokensEnabled) {
Map(ENV_KERBEROS_TGT -> "", ENV_SPARK_PROXY_USER -> proxyUser) ++
generateEngineTokenFile.map(tokenFile => HADOOP_TOKEN_FILE_LOCATION -> tokenFile)
} else Map.empty
conf.getEnvs ++ extraEnvs
}

/**
* Add `spark.master` if KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT
* are defined. So we can deploy spark on kubernetes without setting `spark.master`
Expand Down Expand Up @@ -169,8 +180,10 @@ class SparkProcessBuilder(
tryKeytab() match {
case None if doAsEnabled =>
setSparkUserName(proxyUser, buffer)
buffer += PROXY_USER
buffer += proxyUser
if (!externalTokensEnabled) {
buffer += PROXY_USER
buffer += proxyUser
}
case None => // doAs disabled
setSparkUserName(Utils.currentUser, buffer)
case Some(name) =>
Expand Down Expand Up @@ -409,6 +422,9 @@ object SparkProcessBuilder {
final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts"
final val YARN_SUBMIT_WAIT_APP_COMPLETION = "spark.yarn.submit.waitAppCompletion"
final val INTERNAL_RESOURCE = "spark-internal"
final val HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
final val ENV_KERBEROS_TGT = "KRB5CCNAME"
final val ENV_SPARK_PROXY_USER = "HADOOP_PROXY_USER"

final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path"
final val KUBERNETES_UPLOAD_PATH_PERMISSION =
Expand Down
Loading
Loading