Skip to content

Conversation

HendrikHuebner
Copy link

@HendrikHuebner HendrikHuebner commented Aug 19, 2025

What changes were proposed in this pull request?

When a user sends multiple artifacts with the addArtifacts API, we process each artifact one at a time on the server-side.

If the server detects the user attempting to modify an artifact (by overwriting an existing artifact of the same path with a different byte sequence), an exception is immediately thrown and artifact addition process is terminated.

Instead, the operation should be idempotent and the server should try to add as many artifacts as possible instead of returning early.

Why are the changes needed?

As explained, if the server encounters an error while adding artifacts it will return immediately. This can be a bit wasteful as the server discards all other artifacts sent over the wire regardless of their own status. Thus, an improvement can be made to process all artifacts, catch any exceptions and rethrow them at the end.

Does this PR introduce any user-facing change?

This PR does not modify the existing API or the return codes. If the above scenario is triggered, the only user facing change is that the server adds as many artifacts as possible. Therefore it should be fully backwards compatible. Additionally, if more than one artifact already existed, its exception is added as a suppressed exception. Currently, these suppressed exceptions are not serialized into the grpc object and sent over the wire, however.

How was this patch tested?

Unit tests and local testing.

Was this patch authored or co-authored using generative AI tooling?

No

@HendrikHuebner HendrikHuebner force-pushed the improve-add-artifact-exceptions branch 6 times, most recently from 60b8723 to adb42bf Compare August 20, 2025 09:06
@HendrikHuebner HendrikHuebner force-pushed the improve-add-artifact-exceptions branch from adb42bf to 5885ce0 Compare August 20, 2025 09:10
@HendrikHuebner HendrikHuebner marked this pull request as ready for review August 20, 2025 11:46
Copy link
Contributor

@vicennial vicennial left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the improvement, LGTM!

One important behaviour to note: Currently, the AddArtifact RPC is retry-friendly as overwriting a file with the same byte sequence is NOT considered as a "true overwrite" and thus, is simply ignored.
This PR does not change the above semantics and AddArtifact will remain retry-friendly.

Currently, these suppressed exceptions are not serialized into the grpc object and sent over the wire, however.

Thanks for the callout! I've filed https://issues.apache.org/jira/browse/SPARK-53350 to track this as a potential improvement/follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to other reviewers: Hide whitespace changes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vicennial what do you mean? I saw there was a superfluous whitespace somewhere, I'll remove it before merging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HendrikHuebner Not related to the code per-se, just the way that github displays the diff.

By default it does not hide whitespace modifications:
image

But if its enabled explicitly, the diff is cleaner and easier to understand what changed:
image

throw SparkException.internalError(s"Unsupported artifact storage: ${artifact.storage}")
}
} catch {
case e: SparkRuntimeException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it makes sense to keep it consistent

}

// Validate exception: Should be ARTIFACT_ALREADY_EXISTS and have one suppressed exception
assert(exception.getCondition == "ARTIFACT_ALREADY_EXISTS",
Copy link
Contributor

@heyihong heyihong Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be better practice to use checkError, as it also allows you to check parameters. For example:

    checkError(
      exception = intercept[SparkIllegalArgumentException] {
        ser.write(kryo, ks.newKryoOutput(), DefaultCachedBatch(1, null, InternalRow.empty))
      },
      condition = "INVALID_KRYO_SERIALIZER_NO_DATA",
      parameters = Map(
        "obj" -> "DefaultCachedBatch.buffers",
        "serdeOp" -> "serialize",
        "serdeClass" -> ser.getClass.getName))

assert(exception.getSuppressed.length == 1)
assert(exception.getSuppressed.head.isInstanceOf[SparkRuntimeException])
val suppressed = exception.getSuppressed.head.asInstanceOf[SparkRuntimeException]
assert(suppressed.getCondition == "ARTIFACT_ALREADY_EXISTS")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

if (failedArtifactExceptions.nonEmpty) {
Copy link
Contributor

@heyihong heyihong Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling and suppression logic seems to be duplicated in both ArtifactManager.scala and SparkConnectAddArtifactsHandler.scala.

 if (failedArtifactExceptions.nonEmpty) {
      val exception = failedArtifactExceptions.head
      failedArtifactExceptions.drop(1).foreach(exception.addSuppressed(_))
      throw exception
    }
}

I was wondering whether it makes sense to introduce a small utility to handle Seq[Try[...]] instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArtifactUtils would be a good place for this utility method

@HyukjinKwon HyukjinKwon changed the title [SPARK-53329] Improve exception handling when adding artifacts [SPARK-53329][CONNECT] Improve exception handling when adding artifacts Aug 25, 2025
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me - pending per the comments above.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if CI is green

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants