Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark

import java.io.File
import java.net.Socket
import java.net.{InetAddress, Socket}
import java.util.Locale

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -207,11 +207,15 @@ object SparkEnv extends Logging {
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
var hostnameFinal = hostname
if (conf.getBoolean("spark.lyft.resolve", false)) {
hostnameFinal = InetAddress.getByName(hostname).getHostAddress
}
val env = create(
conf,
executorId,
bindAddress,
hostname,
hostnameFinal,
None,
isLocal,
numCores,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ class HadoopMapReduceCommitProtocol(
jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)

// Automatically set conflict-mode based on value of dynamicPartitionOverwrite,
// unless configuration auto-staging-conflict-mode exists with value false.
val autoConflictMode = jobContext.getConfiguration.get(
"spark.internal.io.hmrcp.auto-staging-conflict-mode")
if (autoConflictMode == null || autoConflictMode != "false") {
if (dynamicPartitionOverwrite) {
jobContext.getConfiguration.set("fs.s3a.committer.staging.conflict-mode", "replace")
} else {
jobContext.getConfiguration.set("fs.s3a.committer.staging.conflict-mode", "append")
}
}

val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
committer = setupCommitter(taskAttemptContext)
committer.setupJob(jobContext)
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/util/LyftUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

private[spark] object LyftUtils {
def callObjectMethodNoArguments(objectName: String, method: String): Boolean = {
var ok = true
try {
val m = Utils.classForName(objectName).getField("MODULE$").get(null)
Utils.classForName(objectName).getDeclaredMethod(method).invoke(m)
} catch {
case e: Throwable => ok = false
}
ok
}
}
13 changes: 12 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2220,9 +2220,16 @@ private[spark] object Utils
* privileged ports.
*/
def userPort(base: Int, offset: Int): Int = {
(base + offset - 1024) % (65536 - 1024) + 1024
(base + offset - 1024) % (65536 - 1024) + 1024
}

def randPort(base: Int, offset: Int, maxRand: Int): Int = {
val rand = new scala.util.Random
val r = rand.nextInt(maxRand)
(base + offset + r) % 65535
}


/**
* Attempt to start a service on the given port, or fail after a number of attempts.
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
Expand All @@ -2249,6 +2256,10 @@ private[spark] object Utils
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
startPort
} else if (offset > 0 && conf.getInt("spark.lyft.maxrand", 0) > 0)
{
logInfo(s"Using randPort")
randPort(startPort, offset, conf.getInt("spark.lyft.maxrand", 0))
} else {
userPort(startPort, offset)
}
Expand Down
41 changes: 41 additions & 0 deletions core/src/test/scala/org/apache/spark/util/LyftUtilsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import org.apache.spark.{SparkFunSuite}
import org.apache.spark.internal.Logging

object TestObjectLyftUtils {
var testVar = 0L
def setVal(): Unit = {
testVar = 1L
}
}

class LyftUtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

test("callObjectMethodNoArguments") {
// Test calling the method using reflection
val v = LyftUtils.callObjectMethodNoArguments(
"org.apache.spark.util.TestObjectLyftUtils$", "setVal")
assert(v === true)
assert(TestObjectLyftUtils.testVar === 1)
assert(false == LyftUtils.callObjectMethodNoArguments(
"org.apache.spark.util.TestObjectLyftUtils$", "setVal1"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.internal.io.cloud

import java.io.IOException

import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}
Expand Down Expand Up @@ -61,6 +59,15 @@ class PathOutputCommitProtocol(
extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite)
with Serializable {

if (dynamicPartitionOverwrite) {
// until there's explicit extensions to the PathOutputCommitProtocols
// to support the spark mechanism, it's left to the individual committer
// choice to handle partitioning.
// throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
// The above exception is disabled with automatic value of
// fs.s3a.committer.staging.conflict-mode in HadoopMapReduceCommitProtocol.
}

/** The committer created. */
@transient private var committer: PathOutputCommitter = _

Expand Down Expand Up @@ -123,7 +130,9 @@ class PathOutputCommitProtocol(
logDebug(
s"Committer $committer has declared compatibility with dynamic partition overwrite")
} else {
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
// throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
// The above exception is disabled with automatic value of
// fs.s3a.committer.staging.conflict-mode in HadoopMapReduceCommitProtocol.
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.internal.io.cloud

import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream}
import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, StreamCapabilities}
Expand Down Expand Up @@ -148,6 +148,9 @@ class CommitterBindingSuite extends SparkFunSuite {
* Bind a job to a committer which doesn't support dynamic partitioning.
* Job setup must fail, and calling `newTaskTempFileAbsPath()` must
* raise `UnsupportedOperationException`.
* With custom patch https://github.com/lyft/spark/pull/34:
* Job setup should now succeed (due to custom patch), but calling
* `newTaskTempFileAbsPath()` must still raise `UnsupportedOperationException`.
*/
test("reject dynamic partitioning if not supported") {
val path = new Path("http://example/data")
Expand All @@ -162,14 +165,13 @@ class CommitterBindingSuite extends SparkFunSuite {
jobId,
path.toUri.toString,
true)
val ioe = intercept[IOException] {
committer.setupJob(tContext)
}
if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
throw ioe
}

// calls to newTaskTempFileAbsPath() will be rejected
// Job setup should now succeed due to the custom patch that disabled
// the IOException throwing for unsupported dynamic partitioning
committer.setupJob(tContext)
committer.setupTask(tContext)

// calls to newTaskTempFileAbsPath() will still be rejected
intercept[UnsupportedOperationException] {
verifyAbsTempFileWorks(tContext, committer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,11 @@ class ExecutorPodsAllocator(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()

org.apache.spark.util.LyftUtils.callObjectMethodNoArguments(
"com.lyft.data.spark.AppMetrics$",
"setFirstExecutorAllocationTime")

try {
addOwnerReference(createdExecutorPod, resources)
resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,12 @@ private[spark] class Client(
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
val linkname = targetDir.map(_ + "/").getOrElse("") +
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
val destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache)
var destPath = localPath
if (!localPath.toUri.getScheme.startsWith("s3")) {
destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache)
} else {
logInfo(s"Adding binary from location: $destPath to the distributed cache")
}
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(
destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
Expand Down Expand Up @@ -709,7 +714,7 @@ private[spark] class Client(
pySparkArchives.foreach { f =>
val uri = Utils.resolveURI(f)
if (uri.getScheme != Utils.LOCAL_SCHEME) {
distribute(f)
distribute(f, LocalResourceType.ARCHIVE)
}
}

Expand Down
Loading