Skip to content

Conversation

sarutak
Copy link
Member

@sarutak sarutak commented Aug 20, 2025

What changes were proposed in this pull request?

This PR fixes an issue which occurs when an operation in pending state is interrupted.
Once an operation in pending state is interrupted, the interruption and following all interruption for the operation never work correctly.
You can easily reproduce this issue by modifying SparkConnectExecutionManager#createExecuteHolderAndAttach like as follows.

     val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
     try {
+      Thread.sleep(1000)
       executeHolder.eventsManager.postStarted()
       executeHolder.start()
     } catch {

And then run a test interrupt all - background queries, foreground interrupt in SparkSessionE2ESuite.

$ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"'

You will see the following error.

[info] - interrupt all - background queries, foreground interrupt *** FAILED *** (20 seconds, 344 milliseconds)
[info]   The code passed to eventually never returned normally. Attempted 28 times over 20.285258458 seconds. Last failure message: Some("unexpected failure in q2: org.apache.spark.SparkException: java.lang.IllegalStateException: Operation was orphaned because of an internal error.") was not empty Error not empty: Some(unexpected failure in q2: org.apache.spark.SparkException: java.lang.IllegalStateException: Operation was orphaned because of an internal error.). (SparkSessionE2ESuite.scala:72)
[info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
[info]   at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:219)
[info]   at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
[info]   at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:313)
[info]   at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:312)
[info]   at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:457)
[info]   at org.apache.spark.sql.connect.SparkSessionE2ESuite.$anonfun$new$1(SparkSessionE2ESuite.scala:72)

If an operation in pending state is interrupted, the interruption is handled in ExecuteHolder#interrupt and ErrorUtils.handleError is called in ErrorUtils#handleError, the operation status transitions to Canceled by calling executeEventsManager.postCanceled.
But postCanceled does not expect transition from pending state so an exception is thrown and propagated to the caller of ExecuteThreadRunner#interrupt.

The reason following all interruptions for the same operation never works correctly is that ExecuteThreadRunner#state has already been changed to interrupted here at the first call of ExecuteThreadRunner#interrupt and following interruptions don't enter this loop and this method always returns false, causing the result of interruption is not correctly recognized.

Another solution would be changing postCanceled to transition from pending state but it causes another issue.
If it's changed as such, postStarted here could be called for an operation in calceled state.
So, this PR just ignores interruption when the target operation is pending state.

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Add a test which confirms that interruption is ignored when the target operation is pending state.
I also confirmed that SparkSessionE2ESuite mentioned above succeeded.

Was this patch authored or co-authored using generative AI tooling?

No.

@sarutak sarutak changed the title [SPARK-53339][CONNECT] Fix a race condition issue which occurs when an operation in pending state is interrupted [SPARK-53339][CONNECT] Fix an issue which occurs when an operation in pending state is interrupted Aug 20, 2025
@sarutak
Copy link
Member Author

sarutak commented Aug 25, 2025

cc: @peter-toth

@sarutak
Copy link
Member Author

sarutak commented Aug 26, 2025

cc: @dongjoon-hyun too.
This issue is one of the obstacles which blocks SPARK-48139.

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @sarutak . I just came back from my vacation today.

@@ -243,6 +243,9 @@ private[connect] class ExecuteHolder(
* true if it was not interrupted before, false if it was already interrupted.
*/
def interrupt(): Boolean = {
if (eventsManager.status == ExecuteStatus.Pending) {
return false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. According to the function description, false is already occupied for the status where it was already interrupted.

  2. For the code change, according to the state transition, can we change the status to ExecuteStatus.Canceled status directly from the current ExecuteStatus.Pending because it's not started yet? In this case, we can return true.

object ExecuteStatus {
case object Pending extends ExecuteStatus(0)
case object Started extends ExecuteStatus(1)
case object Analyzed extends ExecuteStatus(2)
case object ReadyForExecution extends ExecuteStatus(3)
case object Finished extends ExecuteStatus(4)
case object Failed extends ExecuteStatus(5)
case object Canceled extends ExecuteStatus(6)
case object Closed extends ExecuteStatus(7)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the code change, according to the state transition, can we change the status to ExecuteStatus.Canceled status directly from the current ExecuteStatus.Pending because it's not started yet? In this case, we can return true.

Actually, that was my first idea to solve this issue. But as I mentioned in the description, I found that didn't work because transitioning from Pending to Canceled causes another issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the function description, false is already occupied for the status where it was already interrupted.

Hmm, if it's OK to ignore interruption to a pending state operation and we need exactly tell already interrupted from interruption failed, how about returning the exact interruption result rather than boolean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you. As long as the code and description are consistent, I'm okay for both. (1) Updating the description by changing the meaning of false and (2) changing the return types.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion. I'll simply update the description.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Aug 26, 2025

BTW, thank you for the investigation to identify the root cause.

cc @grundprinzip , @hvanhovell , @zhengruifeng to ask if this was the intentional design of state transition or not.

@grundprinzip
Copy link
Contributor

There are two things at play here: the internal state of the operation itself and the notification on the listener bus.

If this patch simply ignores the interrupt on an operation in a pending state, there is a new edge case where we can never cancel this operation if it's stuck in a pending state for whatever reason. Previously, it seems that while the physical query was cancelled, only the observable operation state on the listener bus was not properly handled.

I understand that there is another race condition when the interrupt happens right between the incoming request and the different posting states. I think the better solution is not to ignore the interruption, but we need to figure out how to avoid double-posting of events.

@dongjoon-hyun
Copy link
Member

Given that this is one of the long standing Spark Connect interrupt operation issues which frequently happen in Apache Spark CIs in all live release branches, I'd like to suggest to document this situation as a known issue in Apache Spark 4.0.1 and 3.5.7 independently from this PR. WDYT, @sarutak and @grundprinzip ?

At the same time, we can discuss more in order to figure out the correct steps in Apache Spark 4.1.0 timeframe as @grundprinzip suggested in the above.

@sarutak
Copy link
Member Author

sarutak commented Aug 27, 2025

@dongjoon-hyun
I'm OK to document this as known issue but let me confirm if this issue affects 4.0.1 and 3.5.7 too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants