Skip to content

Commit 50ab8a6

Browse files
Marcelo Vanzintgravescs
authored andcommitted
[SPARK-2669] [yarn] Distribute client configuration to AM.
Currently, when Spark launches the Yarn AM, the process will use the local Hadoop configuration on the node where the AM launches, if one is present. A more correct approach is to use the same configuration used to launch the Spark job, since the user may have made modifications (such as adding app-specific configs). The approach taken here is to use the distributed cache to make all files in the Hadoop configuration directory available to the AM. This is a little overkill since only the AM needs them (the executors use the broadcast Hadoop configuration from the driver), but is the easier approach. Even though only a few files in that directory may end up being used, all of them are uploaded. This allows supporting use cases such as when auxiliary configuration files are used for SSL configuration, or when uploading a Hive configuration directory. Not all of these may be reflected in a o.a.h.conf.Configuration object, but may be needed when a driver in cluster mode instantiates, for example, a HiveConf object instead. Author: Marcelo Vanzin <[email protected]> Closes #4142 from vanzin/SPARK-2669 and squashes the following commits: f5434b9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 013f0fb [Marcelo Vanzin] Review feedback. f693152 [Marcelo Vanzin] Le sigh. ed45b7d [Marcelo Vanzin] Zip all config files and upload them as an archive. 5927b6b [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 cbb9fb3 [Marcelo Vanzin] Remove stale test. e3e58d0 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 e3d0613 [Marcelo Vanzin] Review feedback. 34bdbd8 [Marcelo Vanzin] Fix test. 022a688 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 a77ddd5 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669 79221c7 [Marcelo Vanzin] [SPARK-2669] [yarn] Distribute client configuration to AM.
1 parent c84d916 commit 50ab8a6

File tree

5 files changed

+132
-36
lines changed

5 files changed

+132
-36
lines changed

docs/running-on-yarn.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,11 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
211211
# Launching Spark on YARN
212212

213213
Ensure that `HADOOP_CONF_DIR` or `YARN_CONF_DIR` points to the directory which contains the (client side) configuration files for the Hadoop cluster.
214-
These configs are used to write to the dfs and connect to the YARN ResourceManager.
214+
These configs are used to write to the dfs and connect to the YARN ResourceManager. The
215+
configuration contained in this directory will be distributed to the YARN cluster so that all
216+
containers used by the application use the same configuration. If the configuration references
217+
Java system properties or environment variables not managed by YARN, they should also be set in the
218+
Spark application's configuration (driver, executors, and the AM when running in client mode).
215219

216220
There are two deploy modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
217221

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 105 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20+
import java.io.{File, FileOutputStream}
2021
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
2122
import java.nio.ByteBuffer
23+
import java.util.zip.{ZipEntry, ZipOutputStream}
2224

2325
import scala.collection.JavaConversions._
24-
import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
26+
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
2527
import scala.reflect.runtime.universe
2628
import scala.util.{Try, Success, Failure}
2729

2830
import com.google.common.base.Objects
31+
import com.google.common.io.Files
2932

3033
import org.apache.hadoop.io.DataOutputBuffer
3134
import org.apache.hadoop.conf.Configuration
@@ -77,12 +80,6 @@ private[spark] class Client(
7780

7881
def stop(): Unit = yarnClient.stop()
7982

80-
/* ------------------------------------------------------------------------------------- *
81-
| The following methods have much in common in the stable and alpha versions of Client, |
82-
| but cannot be implemented in the parent trait due to subtle API differences across |
83-
| hadoop versions. |
84-
* ------------------------------------------------------------------------------------- */
85-
8683
/**
8784
* Submit an application running our ApplicationMaster to the ResourceManager.
8885
*
@@ -223,6 +220,10 @@ private[spark] class Client(
223220
val fs = FileSystem.get(hadoopConf)
224221
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
225222
val nns = getNameNodesToAccess(sparkConf) + dst
223+
// Used to keep track of URIs added to the distributed cache. If the same URI is added
224+
// multiple times, YARN will fail to launch containers for the app with an internal
225+
// error.
226+
val distributedUris = new HashSet[String]
226227
obtainTokensForNamenodes(nns, hadoopConf, credentials)
227228
obtainTokenForHiveMetastore(hadoopConf, credentials)
228229

@@ -241,6 +242,17 @@ private[spark] class Client(
241242
"for alternatives.")
242243
}
243244

245+
def addDistributedUri(uri: URI): Boolean = {
246+
val uriStr = uri.toString()
247+
if (distributedUris.contains(uriStr)) {
248+
logWarning(s"Resource $uri added multiple times to distributed cache.")
249+
false
250+
} else {
251+
distributedUris += uriStr
252+
true
253+
}
254+
}
255+
244256
/**
245257
* Copy the given main resource to the distributed cache if the scheme is not "local".
246258
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
@@ -258,11 +270,13 @@ private[spark] class Client(
258270
if (!localPath.isEmpty()) {
259271
val localURI = new URI(localPath)
260272
if (localURI.getScheme != LOCAL_SCHEME) {
261-
val src = getQualifiedLocalPath(localURI, hadoopConf)
262-
val destPath = copyFileToRemote(dst, src, replication)
263-
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
264-
distCacheMgr.addResource(destFs, hadoopConf, destPath,
265-
localResources, LocalResourceType.FILE, destName, statCache)
273+
if (addDistributedUri(localURI)) {
274+
val src = getQualifiedLocalPath(localURI, hadoopConf)
275+
val destPath = copyFileToRemote(dst, src, replication)
276+
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
277+
distCacheMgr.addResource(destFs, hadoopConf, destPath,
278+
localResources, LocalResourceType.FILE, destName, statCache)
279+
}
266280
} else if (confKey != null) {
267281
// If the resource is intended for local use only, handle this downstream
268282
// by setting the appropriate property
@@ -271,6 +285,13 @@ private[spark] class Client(
271285
}
272286
}
273287

288+
createConfArchive().foreach { file =>
289+
require(addDistributedUri(file.toURI()))
290+
val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
291+
distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
292+
LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
293+
}
294+
274295
/**
275296
* Do the same for any additional resources passed in through ClientArguments.
276297
* Each resource category is represented by a 3-tuple of:
@@ -288,13 +309,15 @@ private[spark] class Client(
288309
flist.split(',').foreach { file =>
289310
val localURI = new URI(file.trim())
290311
if (localURI.getScheme != LOCAL_SCHEME) {
291-
val localPath = new Path(localURI)
292-
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
293-
val destPath = copyFileToRemote(dst, localPath, replication)
294-
distCacheMgr.addResource(
295-
fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
296-
if (addToClasspath) {
297-
cachedSecondaryJarLinks += linkname
312+
if (addDistributedUri(localURI)) {
313+
val localPath = new Path(localURI)
314+
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
315+
val destPath = copyFileToRemote(dst, localPath, replication)
316+
distCacheMgr.addResource(
317+
fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
318+
if (addToClasspath) {
319+
cachedSecondaryJarLinks += linkname
320+
}
298321
}
299322
} else if (addToClasspath) {
300323
// Resource is intended for local use only and should be added to the class path
@@ -310,14 +333,65 @@ private[spark] class Client(
310333
localResources
311334
}
312335

336+
/**
337+
* Create an archive with the Hadoop config files for distribution.
338+
*
339+
* These are only used by the AM, since executors will use the configuration object broadcast by
340+
* the driver. The files are zipped and added to the job as an archive, so that YARN will explode
341+
* it when distributing to the AM. This directory is then added to the classpath of the AM
342+
* process, just to make sure that everybody is using the same default config.
343+
*
344+
* This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR
345+
* shows up in the classpath before YARN_CONF_DIR.
346+
*
347+
* Currently this makes a shallow copy of the conf directory. If there are cases where a
348+
* Hadoop config directory contains subdirectories, this code will have to be fixed.
349+
*/
350+
private def createConfArchive(): Option[File] = {
351+
val hadoopConfFiles = new HashMap[String, File]()
352+
Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
353+
sys.env.get(envKey).foreach { path =>
354+
val dir = new File(path)
355+
if (dir.isDirectory()) {
356+
dir.listFiles().foreach { file =>
357+
if (!hadoopConfFiles.contains(file.getName())) {
358+
hadoopConfFiles(file.getName()) = file
359+
}
360+
}
361+
}
362+
}
363+
}
364+
365+
if (!hadoopConfFiles.isEmpty) {
366+
val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip",
367+
new File(Utils.getLocalDir(sparkConf)))
368+
369+
val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive))
370+
try {
371+
hadoopConfStream.setLevel(0)
372+
hadoopConfFiles.foreach { case (name, file) =>
373+
hadoopConfStream.putNextEntry(new ZipEntry(name))
374+
Files.copy(file, hadoopConfStream)
375+
hadoopConfStream.closeEntry()
376+
}
377+
} finally {
378+
hadoopConfStream.close()
379+
}
380+
381+
Some(hadoopConfArchive)
382+
} else {
383+
None
384+
}
385+
}
386+
313387
/**
314388
* Set up the environment for launching our ApplicationMaster container.
315389
*/
316390
private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
317391
logInfo("Setting up the launch environment for our AM container")
318392
val env = new HashMap[String, String]()
319393
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
320-
populateClasspath(args, yarnConf, sparkConf, env, extraCp)
394+
populateClasspath(args, yarnConf, sparkConf, env, true, extraCp)
321395
env("SPARK_YARN_MODE") = "true"
322396
env("SPARK_YARN_STAGING_DIR") = stagingDir
323397
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -718,6 +792,9 @@ object Client extends Logging {
718792
// Distribution-defined classpath to add to processes
719793
val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"
720794

795+
// Subdirectory where the user's hadoop config files will be placed.
796+
val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
797+
721798
/**
722799
* Find the user-defined Spark jar if configured, or return the jar containing this
723800
* class if not.
@@ -831,11 +908,19 @@ object Client extends Logging {
831908
conf: Configuration,
832909
sparkConf: SparkConf,
833910
env: HashMap[String, String],
911+
isAM: Boolean,
834912
extraClassPath: Option[String] = None): Unit = {
835913
extraClassPath.foreach(addClasspathEntry(_, env))
836914
addClasspathEntry(
837915
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
838916
)
917+
918+
if (isAM) {
919+
addClasspathEntry(
920+
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
921+
LOCALIZED_HADOOP_CONF_DIR, env)
922+
}
923+
839924
if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
840925
val userClassPath =
841926
if (args != null) {

yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ class ExecutorRunnable(
277277
private def prepareEnvironment(container: Container): HashMap[String, String] = {
278278
val env = new HashMap[String, String]()
279279
val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
280-
Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
280+
Client.populateClasspath(null, yarnConf, sparkConf, env, false, extraCp)
281281

282282
sparkConf.getExecutorEnv.foreach { case (key, value) =>
283283
// This assumes each executor environment variable set here is a path

yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ package org.apache.spark.deploy.yarn
2020
import java.io.File
2121
import java.net.URI
2222

23+
import scala.collection.JavaConversions._
24+
import scala.collection.mutable.{ HashMap => MutableHashMap }
25+
import scala.reflect.ClassTag
26+
import scala.util.Try
27+
2328
import org.apache.hadoop.conf.Configuration
2429
import org.apache.hadoop.fs.Path
2530
import org.apache.hadoop.mapreduce.MRJobConfig
@@ -30,11 +35,6 @@ import org.mockito.Matchers._
3035
import org.mockito.Mockito._
3136
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
3237

33-
import scala.collection.JavaConversions._
34-
import scala.collection.mutable.{ HashMap => MutableHashMap }
35-
import scala.reflect.ClassTag
36-
import scala.util.Try
37-
3838
import org.apache.spark.{SparkException, SparkConf}
3939
import org.apache.spark.util.Utils
4040

@@ -93,7 +93,7 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
9393
val env = new MutableHashMap[String, String]()
9494
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
9595

96-
Client.populateClasspath(args, conf, sparkConf, env)
96+
Client.populateClasspath(args, conf, sparkConf, env, true)
9797

9898
val cp = env("CLASSPATH").split(":|;|<CPS>")
9999
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -104,13 +104,16 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
104104
cp should not contain (uri.getPath())
105105
}
106106
})
107-
if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
108-
cp should contain("{{PWD}}")
109-
} else if (Utils.isWindows) {
110-
cp should contain("%PWD%")
111-
} else {
112-
cp should contain(Environment.PWD.$())
113-
}
107+
val pwdVar =
108+
if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
109+
"{{PWD}}"
110+
} else if (Utils.isWindows) {
111+
"%PWD%"
112+
} else {
113+
Environment.PWD.$()
114+
}
115+
cp should contain(pwdVar)
116+
cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
114117
cp should not contain (Client.SPARK_JAR)
115118
cp should not contain (Client.APP_JAR)
116119
}

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
7777
private var yarnCluster: MiniYARNCluster = _
7878
private var tempDir: File = _
7979
private var fakeSparkJar: File = _
80+
private var hadoopConfDir: File = _
8081
private var logConfDir: File = _
8182

8283
override def beforeAll() {
@@ -120,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
120121
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
121122

122123
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
124+
hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
125+
assert(hadoopConfDir.mkdir())
126+
File.createTempFile("token", ".txt", hadoopConfDir)
123127
}
124128

125129
override def afterAll() {
@@ -258,7 +262,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
258262
appArgs
259263

260264
Utils.executeAndGetOutput(argv,
261-
extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath()))
265+
extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()))
262266
}
263267

264268
/**

0 commit comments

Comments
 (0)