-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-53329][CONNECT] Improve exception handling when adding artifacts #52073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-53329][CONNECT] Improve exception handling when adding artifacts #52073
Conversation
60b8723
to
adb42bf
Compare
adb42bf
to
5885ce0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the improvement, LGTM!
One important behaviour to note: Currently, the AddArtifact RPC is retry-friendly as overwriting a file with the same byte sequence is NOT considered as a "true overwrite" and thus, is simply ignored.
This PR does not change the above semantics and AddArtifact will remain retry-friendly.
Currently, these suppressed exceptions are not serialized into the grpc object and sent over the wire, however.
Thanks for the callout! I've filed https://issues.apache.org/jira/browse/SPARK-53350 to track this as a potential improvement/follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to other reviewers: Hide whitespace changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vicennial what do you mean? I saw there was a superfluous whitespace somewhere, I'll remove it before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HendrikHuebner Not related to the code per-se, just the way that github displays the diff.
By default it does not hide whitespace modifications:
But if its enabled explicitly, the diff is cleaner and easier to understand what changed:
throw SparkException.internalError(s"Unsupported artifact storage: ${artifact.storage}") | ||
} | ||
} catch { | ||
case e: SparkRuntimeException => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we also need to check error condition like https://github.com/apache/spark/pull/52073/files?diff=unified&w=1#diff-e0c16bbd59568673480e80b94c0ba52345cb267bd9a7d33d29ee7596512d930eR131 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, it makes sense to keep it consistent
} | ||
|
||
// Validate exception: Should be ARTIFACT_ALREADY_EXISTS and have one suppressed exception | ||
assert(exception.getCondition == "ARTIFACT_ALREADY_EXISTS", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be better practice to use checkError, as it also allows you to check parameters. For example:
checkError(
exception = intercept[SparkIllegalArgumentException] {
ser.write(kryo, ks.newKryoOutput(), DefaultCachedBatch(1, null, InternalRow.empty))
},
condition = "INVALID_KRYO_SERIALIZER_NO_DATA",
parameters = Map(
"obj" -> "DefaultCachedBatch.buffers",
"serdeOp" -> "serialize",
"serdeClass" -> ser.getClass.getName))
assert(exception.getSuppressed.length == 1) | ||
assert(exception.getSuppressed.head.isInstanceOf[SparkRuntimeException]) | ||
val suppressed = exception.getSuppressed.head.asInstanceOf[SparkRuntimeException] | ||
assert(suppressed.getCondition == "ARTIFACT_ALREADY_EXISTS") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
} | ||
|
||
if (failedArtifactExceptions.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling and suppression logic seems to be duplicated in both ArtifactManager.scala and SparkConnectAddArtifactsHandler.scala.
if (failedArtifactExceptions.nonEmpty) {
val exception = failedArtifactExceptions.head
failedArtifactExceptions.drop(1).foreach(exception.addSuppressed(_))
throw exception
}
}
I was wondering whether it makes sense to introduce a small utility to handle Seq[Try[...]] instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArtifactUtils would be a good place for this utility method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me - pending per the comments above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if CI is green
What changes were proposed in this pull request?
When a user sends multiple artifacts with the
addArtifacts
API, we process each artifact one at a time on the server-side.If the server detects the user attempting to modify an artifact (by overwriting an existing artifact of the same path with a different byte sequence), an exception is immediately thrown and artifact addition process is terminated.
Instead, the operation should be idempotent and the server should try to add as many artifacts as possible instead of returning early.
Why are the changes needed?
As explained, if the server encounters an error while adding artifacts it will return immediately. This can be a bit wasteful as the server discards all other artifacts sent over the wire regardless of their own status. Thus, an improvement can be made to process all artifacts, catch any exceptions and rethrow them at the end.
Does this PR introduce any user-facing change?
This PR does not modify the existing API or the return codes. If the above scenario is triggered, the only user facing change is that the server adds as many artifacts as possible. Therefore it should be fully backwards compatible. Additionally, if more than one artifact already existed, its exception is added as a suppressed exception. Currently, these suppressed exceptions are not serialized into the grpc object and sent over the wire, however.
How was this patch tested?
Unit tests and local testing.
Was this patch authored or co-authored using generative AI tooling?
No