Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.session.engine.open.onFailure | RETRY | The behavior when opening engine failed: <ul> <li>RETRY: retry to open engine for kyuubi.session.engine.open.max.attempts times.</li> <li>DEREGISTER_IMMEDIATELY: deregister the engine immediately.</li> <li>DEREGISTER_AFTER_RETRY: deregister the engine after retry to open engine for kyuubi.session.engine.open.max.attempts times.</li></ul> | string | 1.8.1 |
| kyuubi.session.engine.open.retry.wait | PT10S | How long to wait before retrying to open the engine after failure. | duration | 1.7.0 |
| kyuubi.session.engine.share.level | USER | (deprecated) - Using kyuubi.engine.share.level instead | string | 1.0.0 |
| kyuubi.session.engine.shutdown.watchdog.timeout | PT1M | The maximum time to wait for the engine to shutdown gracefully before forcing termination. When an engine shutdown is initiated, this watchdog timer starts counting down. If the engine doesn't complete shutdown within this timeout period, it will be forcefully terminated to prevent hanging. Set to 0 or a negative value to disable the forced shutdown mechanism. | duration | 1.11.0 |
| kyuubi.session.engine.spark.initialize.sql || The initialize sql for Spark session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 |
| kyuubi.session.engine.spark.main.resource | &lt;undefined&gt; | The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default | string | 1.0.0 |
| kyuubi.session.engine.spark.max.initial.wait | PT1M | Max wait time for the initial connection to Spark engine. The engine will self-terminate no new incoming connection is established within this time. This setting only applies at the CONNECTION share level. 0 or negative means not to self-terminate. | duration | 1.8.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark
import java.time.Instant
import java.util.{Locale, UUID}
import java.util.concurrent.{CountDownLatch, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
Expand All @@ -45,17 +45,17 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{JavaUtils, SignalRegister, ThreadUtils}
import org.apache.kyuubi.util.{JavaUtils, SignalRegister, ThreadDumpUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {

override val backendService = new SparkSQLBackendService(spark)
override val frontendServices = Seq(new SparkTBinaryFrontendService(this))

private val shutdown = new AtomicBoolean(false)
private val gracefulStopDeregistered = new AtomicBoolean(false)

@volatile private var watchdogThreadRef: AtomicReference[Thread] = new AtomicReference[Thread]()
private val EMERGENCY_SHUTDOWN_EXIT_CODE = 99
private val WATCHDOG_ERROR_EXIT_CODE = 98
@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
private lazy val engineSavePath =
Expand Down Expand Up @@ -98,6 +98,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}

override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
startShutdownWatchdog()
super.stop()
lifetimeTerminatingChecker.foreach(checker => {
val shutdownTimeout = conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
Expand All @@ -121,6 +122,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}

def gracefulStop(): Unit = if (gracefulStopDeregistered.compareAndSet(false, true)) {
startShutdownWatchdog()
val stopTask: Runnable = () => {
if (!shutdown.get) {
info(s"Spark engine is de-registering from engine discovery space.")
Expand Down Expand Up @@ -212,6 +214,76 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
TimeUnit.MILLISECONDS)
}
}

/**
* Starts a shutdown watchdog thread as a failsafe mechanism.
*
* This thread monitors the shutdown process and will forcefully terminate
* the JVM if graceful shutdown takes too long. This prevents zombie processes
* caused by non-daemon threads that refuse to terminate.
*/
private def startShutdownWatchdog(): Unit = {
if (org.apache.kyuubi.Utils.isTesting) {
info("Shutdown Watchdog is disabled in test mode.")
return
}

val shutdownWatchdogTimeout = conf.get(ENGINE_SHUTDOWN_WATCHDOG_TIMEOUT)
if (shutdownWatchdogTimeout <= 0) {
info("Shutdown Watchdog is disabled (timeout <= 0).")
return
}

// Prevent multiple watchdog threads
watchdogThreadRef.synchronized {
if (watchdogThreadRef.get() != null) {
warn("Shutdown Watchdog is already running, ignoring duplicate start request")
return
}
}

info(s"Shutdown Watchdog activated. Engine will be forcefully terminated if graceful " +
s"shutdown exceeds ${shutdownWatchdogTimeout} ms.")

val watchdogThread = new Thread("shutdown-watchdog") {
override def run(): Unit = {
debug("Shutdown Watchdog thread started, monitoring graceful shutdown process")
try {
TimeUnit.MILLISECONDS.sleep(shutdownWatchdogTimeout)

error(s"EMERGENCY SHUTDOWN TRIGGERED")
error(s"Graceful shutdown exceeded ${shutdownWatchdogTimeout} ms timeout")
error(s"Non-daemon threads are preventing JVM exit")
error(s"Initiating forced termination...")

// Thread dump for diagnostics
error(s"=== THREAD DUMP FOR DIAGNOSTIC ===")
ThreadDumpUtils.dumpToLogger(logger)
error(s"=== END OF THREAD DUMP ===")

error(s"Forcefully terminating JVM now...")
System.exit(EMERGENCY_SHUTDOWN_EXIT_CODE)

} catch {
case _: InterruptedException =>
warn("Shutdown Watchdog: Normal shutdown detected, watchdog exiting.")
case t: Throwable =>
error(
s"Shutdown Watchdog error: ${t.getClass.getSimpleName}: ${t.getMessage}")
t.printStackTrace(System.err)
error("Proceeding with emergency termination...")
System.exit(WATCHDOG_ERROR_EXIT_CODE) // Watchdog error
}
}
}

watchdogThread.setDaemon(true)
watchdogThread.start()
watchdogThreadRef.set(watchdogThread)

debug(s"Shutdown Watchdog thread started: ${watchdogThread.getName}")
}

}

object SparkSQLEngine extends Logging {
Expand Down Expand Up @@ -407,6 +479,7 @@ object SparkSQLEngine extends Logging {
startEngine(spark)
// blocking main thread
countDownLatch.await()
currentEngine.foreach(_.startShutdownWatchdog())
} catch {
case e: KyuubiException =>
currentEngine match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1784,6 +1784,17 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofMinutes(30L).toMillis)

val ENGINE_SHUTDOWN_WATCHDOG_TIMEOUT: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.shutdown.watchdog.timeout")
.doc("The maximum time to wait for the engine to shutdown gracefully before " +
"forcing termination. When an engine shutdown is initiated, this watchdog " +
"timer starts counting down. If the engine doesn't complete shutdown within " +
"this timeout period, it will be forcefully terminated to prevent hanging. " +
"Set to 0 or a negative value to disable the forced shutdown mechanism.")
.version("1.11.0")
.timeConf
.createWithDefault(Duration.ofMinutes(1L).toMillis)

val SESSION_CONF_IGNORE_LIST: ConfigEntry[Set[String]] =
buildConf("kyuubi.session.conf.ignore.list")
.doc("A comma-separated list of ignored keys. If the client connection contains any of" +
Expand Down
Loading
Loading