Skip to content

Commit 7f24cf7

Browse files
committed
Automatic staging committer conflict-mode for dynamic partition overwrite
Following up with previous commit, which disabled hard-coded exception in PathOutputCommitProtocol.scala when using dynamicPartitionOverwrite. Spark 3.5 upgrade throws that exception in another location too, so this commit disables that as well. We also update unit test in CommitterBindingSuite.scala to disable IOException throwing for unsupported dynamic partitioning.
1 parent 3c501fa commit 7f24cf7

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ class PathOutputCommitProtocol(
130130
logDebug(
131131
s"Committer $committer has declared compatibility with dynamic partition overwrite")
132132
} else {
133-
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
133+
// throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
134+
// The above exception is disabled with automatic value of
135+
// fs.s3a.committer.staging.conflict-mode in HadoopMapReduceCommitProtocol.
134136
}
135137
}
136138
}

hadoop-cloud/src/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.internal.io.cloud
1919

20-
import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream}
20+
import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
2121

2222
import org.apache.hadoop.conf.Configuration
2323
import org.apache.hadoop.fs.{Path, StreamCapabilities}
@@ -148,6 +148,9 @@ class CommitterBindingSuite extends SparkFunSuite {
148148
* Bind a job to a committer which doesn't support dynamic partitioning.
149149
* Job setup must fail, and calling `newTaskTempFileAbsPath()` must
150150
* raise `UnsupportedOperationException`.
151+
* With custom patch https://github.com/lyft/spark/pull/34:
152+
* Job setup should now succeed (due to custom patch), but calling
153+
* `newTaskTempFileAbsPath()` must still raise `UnsupportedOperationException`.
151154
*/
152155
test("reject dynamic partitioning if not supported") {
153156
val path = new Path("http://example/data")
@@ -162,14 +165,13 @@ class CommitterBindingSuite extends SparkFunSuite {
162165
jobId,
163166
path.toUri.toString,
164167
true)
165-
val ioe = intercept[IOException] {
166-
committer.setupJob(tContext)
167-
}
168-
if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
169-
throw ioe
170-
}
171168

172-
// calls to newTaskTempFileAbsPath() will be rejected
169+
// Job setup should now succeed due to the custom patch that disabled
170+
// the IOException throwing for unsupported dynamic partitioning
171+
committer.setupJob(tContext)
172+
committer.setupTask(tContext)
173+
174+
// calls to newTaskTempFileAbsPath() will still be rejected
173175
intercept[UnsupportedOperationException] {
174176
verifyAbsTempFileWorks(tContext, committer)
175177
}

0 commit comments

Comments
 (0)