Skip to content

Commit 18da6be

Browse files
cataliniianuvedverma
authored andcommitted
Use IP address on the executor side instead of hostname and random port for blockmanager
(cherry picked from commit e90b3e0)
1 parent 7f24cf7 commit 18da6be

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark
1919

2020
import java.io.File
21-
import java.net.Socket
21+
import java.net.{InetAddress, Socket}
2222
import java.util.Locale
2323

2424
import scala.collection.JavaConverters._
@@ -207,11 +207,15 @@ object SparkEnv extends Logging {
207207
numCores: Int,
208208
ioEncryptionKey: Option[Array[Byte]],
209209
isLocal: Boolean): SparkEnv = {
210+
var hostnameFinal = hostname
211+
if (conf.getBoolean("spark.lyft.resolve", false)) {
212+
hostnameFinal = InetAddress.getByName(hostname).getHostAddress
213+
}
210214
val env = create(
211215
conf,
212216
executorId,
213217
bindAddress,
214-
hostname,
218+
hostnameFinal,
215219
None,
216220
isLocal,
217221
numCores,

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2220,9 +2220,16 @@ private[spark] object Utils
22202220
* privileged ports.
22212221
*/
22222222
def userPort(base: Int, offset: Int): Int = {
2223-
(base + offset - 1024) % (65536 - 1024) + 1024
2223+
(base + offset - 1024) % (65536 - 1024) + 1024
22242224
}
22252225

2226+
def randPort(base: Int, offset: Int, maxRand: Int): Int = {
2227+
val rand = new scala.util.Random
2228+
val r = rand.nextInt(maxRand)
2229+
(base + offset + r) % 65535
2230+
}
2231+
2232+
22262233
/**
22272234
* Attempt to start a service on the given port, or fail after a number of attempts.
22282235
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
@@ -2249,6 +2256,10 @@ private[spark] object Utils
22492256
// Do not increment port if startPort is 0, which is treated as a special port
22502257
val tryPort = if (startPort == 0) {
22512258
startPort
2259+
} else if (offset > 0 && conf.getInt("spark.lyft.maxrand", 0) > 0)
2260+
{
2261+
logInfo(s"Using randPort")
2262+
randPort(startPort, offset, conf.getInt("spark.lyft.maxrand", 0))
22522263
} else {
22532264
userPort(startPort, offset)
22542265
}

0 commit comments

Comments
 (0)