Skip to content

Commit 879844f

Browse files
dzhi-lyftanuvedverma
authored andcommitted
Automatic staging committer conflict-mode for dynamic partition overwrite
As an attempt to support dynamic partition overwrite using S3A staging committers, we disabled the previous hard-coded exception and dynamically set the value for fs.s3a.committer.staging.conflict-mode so to ensure PartitionedStagingCommitter behaves as expected in both "INSERT INTO" and "INSERT OVERWRITE" scenarios. The details are documented at: https://docs.google.com/document/d/1fH4AtClYDiQt4fU9g-QzcoRu9SxMo8isGuLgvVxZgdc/edit?usp=sharing (cherry picked from commit 902996c)
1 parent b178bbe commit 879844f

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,18 @@ class HadoopMapReduceCommitProtocol(
183183
jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
184184
jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)
185185

186+
// Automatically set conflict-mode based on value of dynamicPartitionOverwrite,
187+
// unless configuration auto-staging-conflict-mode exists with value false.
188+
val autoConflictMode = jobContext.getConfiguration.get(
189+
"spark.internal.io.hmrcp.auto-staging-conflict-mode")
190+
if (autoConflictMode == null || autoConflictMode != "false") {
191+
if (dynamicPartitionOverwrite) {
192+
jobContext.getConfiguration.set("fs.s3a.committer.staging.conflict-mode", "replace")
193+
} else {
194+
jobContext.getConfiguration.set("fs.s3a.committer.staging.conflict-mode", "append")
195+
}
196+
}
197+
186198
val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
187199
committer = setupCommitter(taskAttemptContext)
188200
committer.setupJob(jobContext)

hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.internal.io.cloud
1919

20-
import java.io.IOException
21-
2220
import org.apache.hadoop.fs.{Path, StreamCapabilities}
2321
import org.apache.hadoop.mapreduce.TaskAttemptContext
2422
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}
@@ -61,6 +59,15 @@ class PathOutputCommitProtocol(
6159
extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite)
6260
with Serializable {
6361

62+
if (dynamicPartitionOverwrite) {
63+
// until there's explicit extensions to the PathOutputCommitProtocols
64+
// to support the spark mechanism, it's left to the individual committer
65+
// choice to handle partitioning.
66+
// throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
67+
// The above exception is disabled with automatic value of fs.s3a.committer.staging.conflict-mode
68+
// in HadoopMapReduceCommitProtocol.
69+
}
70+
6471
/** The committer created. */
6572
@transient private var committer: PathOutputCommitter = _
6673

0 commit comments

Comments
 (0)