diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index edad91a0c6f0d..5dcc166a783e1 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -18,7 +18,7 @@ package org.apache.spark import java.io.File -import java.net.Socket +import java.net.{InetAddress, Socket} import java.util.Locale import scala.collection.JavaConverters._ @@ -207,11 +207,15 @@ object SparkEnv extends Logging { numCores: Int, ioEncryptionKey: Option[Array[Byte]], isLocal: Boolean): SparkEnv = { + var hostnameFinal = hostname + if (conf.getBoolean("spark.lyft.resolve", false)) { + hostnameFinal = InetAddress.getByName(hostname).getHostAddress + } val env = create( conf, executorId, bindAddress, - hostname, + hostnameFinal, None, isLocal, numCores, diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3a24da98ecc24..892427ef9ab6b 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -183,6 +183,18 @@ class HadoopMapReduceCommitProtocol( jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true) jobContext.getConfiguration.setInt("mapreduce.task.partition", 0) + // Automatically set conflict-mode based on value of dynamicPartitionOverwrite, + // unless configuration auto-staging-conflict-mode exists with value false. + val autoConflictMode = jobContext.getConfiguration.get( + "spark.internal.io.hmrcp.auto-staging-conflict-mode") + if (autoConflictMode == null || autoConflictMode != "false") { + if (dynamicPartitionOverwrite) { + jobContext.getConfiguration.set("fs.s3a.committer.staging.conflict-mode", "replace") + } else { + jobContext.getConfiguration.set("fs.s3a.committer.staging.conflict-mode", "append") + } + } + val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) committer.setupJob(jobContext) diff --git a/core/src/main/scala/org/apache/spark/util/LyftUtils.scala b/core/src/main/scala/org/apache/spark/util/LyftUtils.scala new file mode 100644 index 0000000000000..d9c8a643ec57d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/LyftUtils.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +private[spark] object LyftUtils { + def callObjectMethodNoArguments(objectName: String, method: String): Boolean = { + var ok = true + try { + val m = Utils.classForName(objectName).getField("MODULE$").get(null) + Utils.classForName(objectName).getDeclaredMethod(method).invoke(m) + } catch { + case e: Throwable => ok = false + } + ok + } +} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3b0efffedec6f..493955b45d3e2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2220,9 +2220,16 @@ private[spark] object Utils * privileged ports. */ def userPort(base: Int, offset: Int): Int = { - (base + offset - 1024) % (65536 - 1024) + 1024 + (base + offset - 1024) % (65536 - 1024) + 1024 } + def randPort(base: Int, offset: Int, maxRand: Int): Int = { + val rand = new scala.util.Random + val r = rand.nextInt(maxRand) + (base + offset + r) % 65535 + } + + /** * Attempt to start a service on the given port, or fail after a number of attempts. * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). @@ -2249,6 +2256,10 @@ private[spark] object Utils // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) { startPort + } else if (offset > 0 && conf.getInt("spark.lyft.maxrand", 0) > 0) + { + logInfo(s"Using randPort") + randPort(startPort, offset, conf.getInt("spark.lyft.maxrand", 0)) } else { userPort(startPort, offset) } diff --git a/core/src/test/scala/org/apache/spark/util/LyftUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/LyftUtilsSuite.scala new file mode 100644 index 0000000000000..28df4e978cc3f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/LyftUtilsSuite.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.{SparkFunSuite} +import org.apache.spark.internal.Logging + +object TestObjectLyftUtils { + var testVar = 0L + def setVal(): Unit = { + testVar = 1L + } +} + +class LyftUtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { + + test("callObjectMethodNoArguments") { + // Test calling the method using reflection + val v = LyftUtils.callObjectMethodNoArguments( + "org.apache.spark.util.TestObjectLyftUtils$", "setVal") + assert(v === true) + assert(TestObjectLyftUtils.testVar === 1) + assert(false == LyftUtils.callObjectMethodNoArguments( + "org.apache.spark.util.TestObjectLyftUtils$", "setVal1")) + } +} diff --git a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 44a521bd636c5..27d5eddc48174 100644 --- a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -17,8 +17,6 @@ package org.apache.spark.internal.io.cloud -import java.io.IOException - import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} @@ -61,6 +59,15 @@ class PathOutputCommitProtocol( extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) with Serializable { + if (dynamicPartitionOverwrite) { + // until there's explicit extensions to the PathOutputCommitProtocols + // to support the spark mechanism, it's left to the individual committer + // choice to handle partitioning. + // throw new IOException(PathOutputCommitProtocol.UNSUPPORTED) + // The above exception is disabled with automatic value of + // fs.s3a.committer.staging.conflict-mode in HadoopMapReduceCommitProtocol. + } + /** The committer created. */ @transient private var committer: PathOutputCommitter = _ @@ -123,7 +130,9 @@ class PathOutputCommitProtocol( logDebug( s"Committer $committer has declared compatibility with dynamic partition overwrite") } else { - throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer) + // throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer) + // The above exception is disabled with automatic value of + // fs.s3a.committer.staging.conflict-mode in HadoopMapReduceCommitProtocol. } } } diff --git a/hadoop-cloud/src/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 984c7dbc2cb1b..005e0f217f262 100644 --- a/hadoop-cloud/src/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.internal.io.cloud -import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream} +import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, StreamCapabilities} @@ -148,6 +148,9 @@ class CommitterBindingSuite extends SparkFunSuite { * Bind a job to a committer which doesn't support dynamic partitioning. * Job setup must fail, and calling `newTaskTempFileAbsPath()` must * raise `UnsupportedOperationException`. + * With custom patch https://github.com/lyft/spark/pull/34: + * Job setup should now succeed (due to custom patch), but calling + * `newTaskTempFileAbsPath()` must still raise `UnsupportedOperationException`. */ test("reject dynamic partitioning if not supported") { val path = new Path("http://example/data") @@ -162,14 +165,13 @@ class CommitterBindingSuite extends SparkFunSuite { jobId, path.toUri.toString, true) - val ioe = intercept[IOException] { - committer.setupJob(tContext) - } - if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { - throw ioe - } - // calls to newTaskTempFileAbsPath() will be rejected + // Job setup should now succeed due to the custom patch that disabled + // the IOException throwing for unsupported dynamic partitioning + committer.setupJob(tContext) + committer.setupTask(tContext) + + // calls to newTaskTempFileAbsPath() will still be rejected intercept[UnsupportedOperationException] { verifyAbsTempFileWorks(tContext, committer) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 0ace14e2aaafd..60119ae3c0843 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -468,6 +468,11 @@ class ExecutorPodsAllocator( podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) val createdExecutorPod = kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create() + + org.apache.spark.util.LyftUtils.callObjectMethodNoArguments( + "com.lyft.data.spark.AppMetrics$", + "setFirstExecutorAllocationTime") + try { addOwnerReference(createdExecutorPod, resources) resources diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9c8a6dd8db069..4e4d49bd1bd67 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -532,7 +532,12 @@ private[spark] class Client( val localPath = getQualifiedLocalPath(localURI, hadoopConf) val linkname = targetDir.map(_ + "/").getOrElse("") + destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache) + var destPath = localPath + if (!localPath.toUri.getScheme.startsWith("s3")) { + destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache) + } else { + logInfo(s"Adding binary from location: $destPath to the distributed cache") + } val destFs = FileSystem.get(destPath.toUri(), hadoopConf) distCacheMgr.addResource( destFs, hadoopConf, destPath, localResources, resType, linkname, statCache, @@ -709,7 +714,7 @@ private[spark] class Client( pySparkArchives.foreach { f => val uri = Utils.resolveURI(f) if (uri.getScheme != Utils.LOCAL_SCHEME) { - distribute(f) + distribute(f, LocalResourceType.ARCHIVE) } }