@@ -19,6 +19,7 @@ package org.apache.spark.ui
19
19
20
20
import java .net .InetSocketAddress
21
21
import java .net .URL
22
+ import javax .servlet .DispatcherType
22
23
import javax .servlet .http .{HttpServlet , HttpServletResponse , HttpServletRequest }
23
24
24
25
import scala .annotation .tailrec
@@ -28,7 +29,7 @@ import scala.xml.Node
28
29
import org .json4s .JValue
29
30
import org .json4s .jackson .JsonMethods .{pretty , render }
30
31
31
- import org .eclipse .jetty .server .{DispatcherType , Server }
32
+ import org .eclipse .jetty .server .{NetworkConnector , Server }
32
33
import org .eclipse .jetty .server .handler .HandlerList
33
34
import org .eclipse .jetty .servlet .{DefaultServlet , FilterHolder , ServletContextHandler , ServletHolder }
34
35
import org .eclipse .jetty .util .thread .QueuedThreadPool
@@ -60,8 +61,7 @@ private[spark] object JettyUtils extends Logging {
60
61
def createServlet [T <% AnyRef ](servletParams : ServletParams [T ],
61
62
securityMgr : SecurityManager ): HttpServlet = {
62
63
new HttpServlet {
63
- override def doGet (request : HttpServletRequest ,
64
- response : HttpServletResponse ) {
64
+ override def doGet (request : HttpServletRequest , response : HttpServletResponse ) {
65
65
if (securityMgr.checkUIViewPermissions(request.getRemoteUser())) {
66
66
response.setContentType(" %s;charset=utf-8" .format(servletParams.contentType))
67
67
response.setStatus(HttpServletResponse .SC_OK )
@@ -72,7 +72,7 @@ private[spark] object JettyUtils extends Logging {
72
72
response.setStatus(HttpServletResponse .SC_UNAUTHORIZED )
73
73
response.setHeader(" Cache-Control" , " no-cache, no-store, must-revalidate" )
74
74
response.sendError(HttpServletResponse .SC_UNAUTHORIZED ,
75
- " User is not authorized to access this page." );
75
+ " User is not authorized to access this page." )
76
76
}
77
77
}
78
78
}
@@ -120,26 +120,25 @@ private[spark] object JettyUtils extends Logging {
120
120
121
121
private def addFilters (handlers : Seq [ServletContextHandler ], conf : SparkConf ) {
122
122
val filters : Array [String ] = conf.get(" spark.ui.filters" , " " ).split(',' ).map(_.trim())
123
- filters.foreach {
124
- case filter : String =>
125
- if (! filter.isEmpty) {
126
- logInfo(" Adding filter: " + filter)
127
- val holder : FilterHolder = new FilterHolder ()
128
- holder.setClassName(filter)
129
- // get any parameters for each filter
130
- val paramName = " spark." + filter + " .params"
131
- val params = conf.get(paramName, " " ).split(',' ).map(_.trim()).toSet
132
- params.foreach {
133
- case param : String =>
134
- if (! param.isEmpty) {
135
- val parts = param.split(" =" )
136
- if (parts.length == 2 ) holder.setInitParameter(parts(0 ), parts(1 ))
137
- }
138
- }
139
- val enumDispatcher = java.util.EnumSet .of(DispatcherType .ASYNC , DispatcherType .ERROR ,
140
- DispatcherType .FORWARD , DispatcherType .INCLUDE , DispatcherType .REQUEST )
141
- handlers.foreach { case (handler) => handler.addFilter(holder, " /*" , enumDispatcher) }
123
+ filters.foreach { filter =>
124
+ if (! filter.isEmpty) {
125
+ logInfo(" Adding filter: " + filter)
126
+ val holder : FilterHolder = new FilterHolder ()
127
+ holder.setClassName(filter)
128
+ // get any parameters for each filter
129
+ val paramName = " spark." + filter + " .params"
130
+ val params = conf.get(paramName, " " ).split(',' ).map(_.trim()).toSet
131
+ params.foreach {
132
+ case param : String =>
133
+ if (! param.isEmpty) {
134
+ val parts = param.split(" =" )
135
+ if (parts.length == 2 ) holder.setInitParameter(parts(0 ), parts(1 ))
136
+ }
142
137
}
138
+ val enumDispatcher = java.util.EnumSet .of(DispatcherType .ASYNC , DispatcherType .ERROR ,
139
+ DispatcherType .FORWARD , DispatcherType .INCLUDE , DispatcherType .REQUEST )
140
+ handlers.foreach { handler => handler.addFilter(holder, " /*" , enumDispatcher) }
141
+ }
143
142
}
144
143
}
145
144
@@ -150,7 +149,10 @@ private[spark] object JettyUtils extends Logging {
150
149
* If the desired port number is contented, continues incrementing ports until a free port is
151
150
* found. Returns the chosen port and the jetty Server object.
152
151
*/
153
- def startJettyServer (hostName : String , port : Int , handlers : Seq [ServletContextHandler ],
152
+ def startJettyServer (
153
+ hostName : String ,
154
+ port : Int ,
155
+ handlers : Seq [ServletContextHandler ],
154
156
conf : SparkConf ): (Server , Int ) = {
155
157
156
158
addFilters(handlers, conf)
@@ -160,16 +162,18 @@ private[spark] object JettyUtils extends Logging {
160
162
@ tailrec
161
163
def connect (currentPort : Int ): (Server , Int ) = {
162
164
val server = new Server (new InetSocketAddress (hostName, currentPort))
163
- val pool = new QueuedThreadPool
165
+ // Unfortunately Jetty 9 doesn't allow us to set both the thread pool and the port number in
166
+ // constructor. But fortunately the pool allocated by Jetty is always a QueuedThreadPool.
167
+ val pool = server.getThreadPool.asInstanceOf [QueuedThreadPool ]
164
168
pool.setDaemon(true )
165
- server.setThreadPool(pool)
169
+
166
170
server.setHandler(handlerList)
167
171
168
172
Try {
169
173
server.start()
170
174
} match {
171
175
case s : Success [_] =>
172
- (server, server.getConnectors.head.getLocalPort)
176
+ (server, server.getConnectors.head.asInstanceOf [ NetworkConnector ]. getLocalPort)
173
177
case f : Failure [_] =>
174
178
server.stop()
175
179
logInfo(" Failed to create UI at port, %s. Trying again." .format(currentPort))
0 commit comments