Skip to content

Commit aa9a7f5

Browse files
committed
SPARK-1606: Infer user application arguments instead of requiring --arg.
This modifies spark-submit to do something more like the Hadoop `jar` command. Now we have the following syntax: ./bin/spark-submit [options] user.jar [user options] Author: Patrick Wendell <[email protected]> Closes mesos#563 from pwendell/spark-submit and squashes the following commits: 32241fc [Patrick Wendell] Review feedback 3adfb69 [Patrick Wendell] Small fix bc48139 [Patrick Wendell] SPARK-1606: Infer user application arguments instead of requiring --arg.
1 parent 762af4e commit aa9a7f5

File tree

6 files changed

+181
-162
lines changed

6 files changed

+181
-162
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ object SparkSubmit {
185185
if (clusterManager == STANDALONE) {
186186
val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
187187
sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
188-
println("SPARK JARS" + sysProps.get("spark.jars"))
189188
}
190189

191190
if (deployOnCluster && clusterManager == STANDALONE) {

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 114 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import java.io.{File, FileInputStream, IOException}
2121
import java.util.Properties
2222

2323
import scala.collection.JavaConversions._
24-
import scala.collection.mutable.{HashMap, ArrayBuffer}
24+
import scala.collection.mutable.{ArrayBuffer, HashMap}
2525

2626
import org.apache.spark.SparkException
27+
import org.apache.spark.util.Utils
2728

2829
/**
2930
* Parses and encapsulates arguments from the spark-submit script.
3031
*/
31-
private[spark] class SparkSubmitArguments(args: Array[String]) {
32+
private[spark] class SparkSubmitArguments(args: Seq[String]) {
3233
var master: String = null
3334
var deployMode: String = null
3435
var executorMemory: String = null
@@ -118,8 +119,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
118119

119120
if (master.startsWith("yarn")) {
120121
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
121-
val testing = sys.env.contains("SPARK_TESTING")
122-
if (!hasHadoopEnv && !testing) {
122+
if (!hasHadoopEnv && !Utils.isTesting) {
123123
throw new Exception(s"When running with master '$master' " +
124124
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
125125
}
@@ -156,119 +156,121 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
156156
""".stripMargin
157157
}
158158

159-
private def parseOpts(opts: List[String]): Unit = opts match {
160-
case ("--name") :: value :: tail =>
161-
name = value
162-
parseOpts(tail)
159+
/** Fill in values by parsing user options. */
160+
private def parseOpts(opts: Seq[String]): Unit = {
161+
// Delineates parsing of Spark options from parsing of user options.
162+
var inSparkOpts = true
163+
parse(opts)
163164

164-
case ("--master") :: value :: tail =>
165-
master = value
166-
parseOpts(tail)
165+
def parse(opts: Seq[String]): Unit = opts match {
166+
case ("--name") :: value :: tail =>
167+
name = value
168+
parse(tail)
167169

168-
case ("--class") :: value :: tail =>
169-
mainClass = value
170-
parseOpts(tail)
170+
case ("--master") :: value :: tail =>
171+
master = value
172+
parse(tail)
171173

172-
case ("--deploy-mode") :: value :: tail =>
173-
if (value != "client" && value != "cluster") {
174-
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
175-
}
176-
deployMode = value
177-
parseOpts(tail)
178-
179-
case ("--num-executors") :: value :: tail =>
180-
numExecutors = value
181-
parseOpts(tail)
182-
183-
case ("--total-executor-cores") :: value :: tail =>
184-
totalExecutorCores = value
185-
parseOpts(tail)
186-
187-
case ("--executor-cores") :: value :: tail =>
188-
executorCores = value
189-
parseOpts(tail)
190-
191-
case ("--executor-memory") :: value :: tail =>
192-
executorMemory = value
193-
parseOpts(tail)
194-
195-
case ("--driver-memory") :: value :: tail =>
196-
driverMemory = value
197-
parseOpts(tail)
198-
199-
case ("--driver-cores") :: value :: tail =>
200-
driverCores = value
201-
parseOpts(tail)
202-
203-
case ("--driver-class-path") :: value :: tail =>
204-
driverExtraClassPath = value
205-
parseOpts(tail)
206-
207-
case ("--driver-java-options") :: value :: tail =>
208-
driverExtraJavaOptions = value
209-
parseOpts(tail)
210-
211-
case ("--driver-library-path") :: value :: tail =>
212-
driverExtraLibraryPath = value
213-
parseOpts(tail)
214-
215-
case ("--properties-file") :: value :: tail =>
216-
propertiesFile = value
217-
parseOpts(tail)
218-
219-
case ("--supervise") :: tail =>
220-
supervise = true
221-
parseOpts(tail)
222-
223-
case ("--queue") :: value :: tail =>
224-
queue = value
225-
parseOpts(tail)
226-
227-
case ("--files") :: value :: tail =>
228-
files = value
229-
parseOpts(tail)
230-
231-
case ("--archives") :: value :: tail =>
232-
archives = value
233-
parseOpts(tail)
234-
235-
case ("--arg") :: value :: tail =>
236-
childArgs += value
237-
parseOpts(tail)
238-
239-
case ("--jars") :: value :: tail =>
240-
jars = value
241-
parseOpts(tail)
242-
243-
case ("--help" | "-h") :: tail =>
244-
printUsageAndExit(0)
245-
246-
case ("--verbose" | "-v") :: tail =>
247-
verbose = true
248-
parseOpts(tail)
249-
250-
case value :: tail =>
251-
if (value.startsWith("-")) {
252-
val errMessage = s"Unrecognized option '$value'."
253-
val suggestion: Option[String] = value match {
254-
case v if v.startsWith("--") && v.contains("=") =>
255-
val parts = v.split("=")
256-
Some(s"Perhaps you want '${parts(0)} ${parts(1)}'?")
257-
case _ =>
258-
None
174+
case ("--class") :: value :: tail =>
175+
mainClass = value
176+
parse(tail)
177+
178+
case ("--deploy-mode") :: value :: tail =>
179+
if (value != "client" && value != "cluster") {
180+
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
181+
}
182+
deployMode = value
183+
parse(tail)
184+
185+
case ("--num-executors") :: value :: tail =>
186+
numExecutors = value
187+
parse(tail)
188+
189+
case ("--total-executor-cores") :: value :: tail =>
190+
totalExecutorCores = value
191+
parse(tail)
192+
193+
case ("--executor-cores") :: value :: tail =>
194+
executorCores = value
195+
parse(tail)
196+
197+
case ("--executor-memory") :: value :: tail =>
198+
executorMemory = value
199+
parse(tail)
200+
201+
case ("--driver-memory") :: value :: tail =>
202+
driverMemory = value
203+
parse(tail)
204+
205+
case ("--driver-cores") :: value :: tail =>
206+
driverCores = value
207+
parse(tail)
208+
209+
case ("--driver-class-path") :: value :: tail =>
210+
driverExtraClassPath = value
211+
parse(tail)
212+
213+
case ("--driver-java-options") :: value :: tail =>
214+
driverExtraJavaOptions = value
215+
parse(tail)
216+
217+
case ("--driver-library-path") :: value :: tail =>
218+
driverExtraLibraryPath = value
219+
parse(tail)
220+
221+
case ("--properties-file") :: value :: tail =>
222+
propertiesFile = value
223+
parse(tail)
224+
225+
case ("--supervise") :: tail =>
226+
supervise = true
227+
parse(tail)
228+
229+
case ("--queue") :: value :: tail =>
230+
queue = value
231+
parse(tail)
232+
233+
case ("--files") :: value :: tail =>
234+
files = value
235+
parse(tail)
236+
237+
case ("--archives") :: value :: tail =>
238+
archives = value
239+
parse(tail)
240+
241+
case ("--jars") :: value :: tail =>
242+
jars = value
243+
parse(tail)
244+
245+
case ("--help" | "-h") :: tail =>
246+
printUsageAndExit(0)
247+
248+
case ("--verbose" | "-v") :: tail =>
249+
verbose = true
250+
parse(tail)
251+
252+
case value :: tail =>
253+
if (inSparkOpts) {
254+
value match {
255+
// convert --foo=bar to --foo bar
256+
case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
257+
val parts = v.split("=")
258+
parse(Seq(parts(0), parts(1)) ++ tail)
259+
case v if v.startsWith("-") =>
260+
val errMessage = s"Unrecognized option '$value'."
261+
SparkSubmit.printErrorAndExit(errMessage)
262+
case v =>
263+
primaryResource = v
264+
inSparkOpts = false
265+
parse(tail)
266+
}
267+
} else {
268+
childArgs += value
269+
parse(tail)
259270
}
260-
SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(""))
261-
}
262271

263-
if (primaryResource != null) {
264-
val error = s"Found two conflicting resources, $value and $primaryResource." +
265-
" Expecting only one resource."
266-
SparkSubmit.printErrorAndExit(error)
272+
case Nil =>
267273
}
268-
primaryResource = value
269-
parseOpts(tail)
270-
271-
case Nil =>
272274
}
273275

274276
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
@@ -277,7 +279,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
277279
outStream.println("Unknown/unsupported param " + unknownParam)
278280
}
279281
outStream.println(
280-
"""Usage: spark-submit <app jar> [options]
282+
"""Usage: spark-submit [options] <app jar> [app options]
281283
|Options:
282284
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
283285
| --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,4 +1056,11 @@ private[spark] object Utils extends Logging {
10561056
def getHadoopFileSystem(path: String): FileSystem = {
10571057
getHadoopFileSystem(new URI(path))
10581058
}
1059+
1060+
/**
1061+
* Indicates whether Spark is currently running unit tests.
1062+
*/
1063+
private[spark] def isTesting = {
1064+
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
1065+
}
10591066
}

0 commit comments

Comments
 (0)