Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 57 additions & 52 deletions core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@
package com.cloudera.spark

import java.lang.management._
import java.math.{RoundingMode, MathContext}
import java.math.{MathContext, RoundingMode}
import java.text.SimpleDateFormat
import java.util.Locale
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean, AtomicReference}
import java.util.concurrent._

import scala.collection.JavaConverters._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}

import com.quantifind.sumac.FieldArgs

import org.apache.spark.{TaskContext, SparkContext}
import com.typesafe.scalalogging.Logger
import org.apache.spark.executor.ExecutorPlugin
import org.apache.spark.memory.SparkMemoryManagerHandle
import org.apache.spark.{SparkContext, TaskContext}

import scala.collection.JavaConverters._

class MemoryMonitor(val args: MemoryMonitorArgs) {

private[this] val log = Logger(this.getClass)

val nettyMemoryHandle = SparkNettyMemoryHandle.get()
val sparkMemManagerHandle = SparkMemoryManagerHandle.get()
val memoryBean = ManagementFactory.getMemoryMXBean
Expand Down Expand Up @@ -59,8 +62,8 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
val inShutdown = new AtomicBoolean(false)

def showMetricNames: Unit = {
println(s"${nMetrics} Metrics")
(0 until nMetrics).foreach { idx => println(names(idx))}
log.info(s"${nMetrics} Metrics")
(0 until nMetrics).foreach { idx => log.info(names(idx))}
}

def collectSnapshot: MemorySnapshot = {
Expand All @@ -81,41 +84,39 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
}

def showSnapshot(mem: MemorySnapshot): Unit = {
println(s"Mem usage at ${MemoryMonitor.dateFormat.format(mem.time)}")
println("===============")
log.info(s"Mem usage at ${MemoryMonitor.dateFormat.format(mem.time)}")
log.info("===============")
// TODO headers for each getter?
(0 until nMetrics).foreach { idx =>
val v = mem.values(idx)
println(names(idx) + "\t:" + MemoryMonitor.bytesToString(v) + "(" + v + ")")
log.info(names(idx) + "\t:" + MemoryMonitor.bytesToString(v) + "(" + v + ")")
}
println()
println()
log.info("")
log.info("")
}

def updateAndMaybeShowPeaks(): Unit = {
val snapshot = collectSnapshot
if (peakMemoryUsage.update(snapshot, peakUpdates, reporting)) {
showUpdates(snapshot.time, peakMemoryUsage, peakUpdates)
}
if (peakMemoryUsage.update(snapshot, peakUpdates, reporting)) showUpdates(snapshot.time, peakMemoryUsage, peakUpdates)
}

def showUpdates(time: Long, peakMemory: MemoryPeaks, updates: PeakUpdate): Unit = {
println(s"Peak Memory updates:${MemoryMonitor.dateFormat.format(time)}")
log.info(s"Peak Memory updates:${MemoryMonitor.dateFormat.format(time)}")
(0 until updates.nUpdates).foreach { updateIdx =>
val metricIdx = updates.updateIdx(updateIdx)
val name = names(metricIdx)
val currentVal = MemoryMonitor.bytesToString(peakMemoryUsage.values(metricIdx))
val rawDelta = updates.delta(updateIdx)
val delta = (if (rawDelta > 0) "+" else "-") + MemoryMonitor.bytesToString(rawDelta)
println(s"$name\t:\t$currentVal ($delta)")
log.info(s"$name\t:\t$currentVal ($delta)")
}
}

def showPeaks(time: Long): Unit = {
println(s"Peak Memory usage so far ${MemoryMonitor.dateFormat.format(time)}")
log.info(s"Peak Memory usage so far ${MemoryMonitor.dateFormat.format(time)}")
// TODO headers for each getter?
(0 until nMetrics).foreach { idx =>
println(names(idx) + "\t:" + MemoryMonitor.bytesToString(peakMemoryUsage.values(idx)) +
log.info(names(idx) + "\t:" + MemoryMonitor.bytesToString(peakMemoryUsage.values(idx)) +
"\t\t\t\t" + MemoryMonitor.dateFormat.format(peakMemoryUsage.peakTimes(idx)))
}
}
Expand All @@ -127,7 +128,7 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
def showLastThreadDump: Unit = {
val threads = lastThreadDump.get()
if (threads != null) {
println("last thread dump:")
log.info("last thread dump:")
MemoryMonitor.showThreadDump(threads)
}
}
Expand All @@ -136,14 +137,14 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
Runtime.getRuntime.addShutdownHook(new Thread(){
override def run(): Unit = {
inShutdown.set(true)
println()
println("IN SHUTDOWN")
println("================")
log.info("")
log.info("IN SHUTDOWN")
log.info("================")
val snapshot = collectSnapshot
showSnapshot(snapshot)
peakMemoryUsage.update(snapshot, peakUpdates, reporting)
showPeaks(snapshot.time)
println("Last non-shutdown snapshot:")
log.info("Last non-shutdown snapshot:")
showSnapshot(lastNonShutdownSnapshot.get())

showLastThreadDump
Expand All @@ -154,32 +155,33 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
def beanInfo(): Unit = {

memMgrBeans.foreach { mgr =>
println(mgr.getName + " is managing " + mgr.getMemoryPoolNames.mkString(","))
log.info(mgr.getName + " is managing " + mgr.getMemoryPoolNames.mkString(","))
}

poolBeans.foreach { pool =>
println(pool.getName())
println("============")
println(pool.getName() + " is managed by " + pool.getMemoryManagerNames.mkString(","))
log.info(pool.getName())
log.info("============")
log.info(pool.getName() + " is managed by " + pool.getMemoryManagerNames.mkString(","))
if (pool.isUsageThresholdSupported)
println("supports usage threshold")
log.info("supports usage threshold")
if (pool.isCollectionUsageThresholdSupported)
println("supports collection usage threshold")
log.info("supports collection usage threshold")
pool.getUsage
println()
println()
log.info("")
log.info("")
}

println("BUFFER POOLS")
log.info("BUFFER POOLS")
bufferPoolsBeans.foreach { bp =>
println(s"${bp.getName}: ${bp.getMemoryUsed} / ${bp.getTotalCapacity}")
log.info(s"${bp.getName}: ${bp.getMemoryUsed} / ${bp.getTotalCapacity}")
}
}
}

object MemoryMonitor {

val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
private[this] val log = Logger(this.getClass)

private var monitor: MemoryMonitor = null
private var shutdownHookInstalled = false
Expand Down Expand Up @@ -222,16 +224,16 @@ object MemoryMonitor {
def listAllMBeans: Unit = {
val server = ManagementFactory.getPlatformMBeanServer
val allBeans = server.queryNames(null, null)
println("ALL BEANS")
println("=============")
allBeans.asScala.map{_.toString}.toArray.sorted.foreach { ob => println(ob) }
println()
println()
log.info("ALL BEANS")
log.info("=============")
allBeans.asScala.map{_.toString}.toArray.sorted.foreach { ob => log.info(ob) }
log.info("")
log.info("")
}

def showLimits: Unit = {
println("sun.misc.VM.maxDirectMemory(): " + sun.misc.VM.maxDirectMemory())
println("Runtime.getRuntime.maxMemory(): " + Runtime.getRuntime.maxMemory())
log.info("sun.misc.VM.maxDirectMemory(): " + sun.misc.VM.maxDirectMemory())
log.info("Runtime.getRuntime.maxMemory(): " + Runtime.getRuntime.maxMemory())
}

def installIfSysProps(): Unit = {
Expand All @@ -240,9 +242,9 @@ object MemoryMonitor {
install(args)
installShutdownHook()
args.freq.foreach { freq =>
println(s"POLLING memory monitor every $freq millis")
log.info(s"POLLING memory monitor every $freq millis")
monitor.showCurrentMemUsage
println("done with initial show")
log.info("done with initial show")
startPolling(args)
}
}
Expand All @@ -252,7 +254,7 @@ object MemoryMonitor {
if (!SparkMemoryManagerHandle.isDynamicAllocation(sc)) {
installOnExecutors(sc)
} else {
println ("********* WARNING ***** not installing on executors because of DA")
log.info ("********* WARNING ***** not installing on executors because of DA")
}
}

Expand All @@ -263,7 +265,7 @@ object MemoryMonitor {
} else {
numTasks
}
println(s"Running $t tasks to install memory monitor on executors")
log.info(s"Running $t tasks to install memory monitor on executors")
sc.parallelize(1 to t, t).foreach { _ =>
Thread.sleep(sleep)
installIfSysProps()
Expand Down Expand Up @@ -317,17 +319,19 @@ object MemoryMonitor {
def showThreadDump(threads: Array[ThreadInfo]): Unit = {
threads.foreach { t =>
if (t == null) {
println("<null thread>")
log.info("<null thread>")
} else {
println(t.getThreadId + " " + t.getThreadName + " " + t.getThreadState)
t.getStackTrace.foreach { elem => println("\t" + elem) }
println()
log.info(t.getThreadId + " " + t.getThreadName + " " + t.getThreadState)
t.getStackTrace.foreach { elem => log.info("\t" + elem) }
log.info("")
}
}
}
}

class MemoryMonitorExecutorExtension extends ExecutorPlugin {
private[this] val log = Logger(this.getClass)

// the "extension class" api just lets you invoke a constructor. We really just want to
// call this static method, so that's good enough.
MemoryMonitor.installIfSysProps()
Expand Down Expand Up @@ -356,7 +360,7 @@ class MemoryMonitorExecutorExtension extends ExecutorPlugin {
val task = scheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = {
val d = MemoryMonitor.dateFormat.format(System.currentTimeMillis())
println(s"Polled thread dump @ $d")
log.info(s"Polled thread dump @ $d")
MemoryMonitor.showThreadDump(MemoryMonitor.getThreadInfo)
}
}, 0, args.threadDumpFreqMillis, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -397,6 +401,7 @@ class MemoryMonitorArgs extends FieldArgs {
}

object MemoryMonitorArgs {
private[this] val log = Logger(this.getClass)
val prefix = "memory.monitor."
val prefixLen = prefix.length

Expand All @@ -406,7 +411,7 @@ object MemoryMonitorArgs {
k.substring(prefixLen) -> v
})
if (args.stagesToPoll != null && args.stagesToPoll.nonEmpty) {
System.out.println(s"will poll thread dumps for stages ${args.stagesToPoll.mkString(",")}")
log.info(s"will poll thread dumps for stages ${args.stagesToPoll.mkString(",")}")
} else {
args.stagesToPoll = Array()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.apache.spark.memory

import com.cloudera.spark.{Reflector, IncrementBytes, MemoryGetter}
import org.apache.spark.util.{Utils, ThreadStackTrace}
import com.cloudera.spark.{IncrementBytes, MemoryGetter, Reflector}
import org.apache.spark.{SparkContext, SparkEnv}

class SparkMemoryManagerHandle(
Expand Down
35 changes: 35 additions & 0 deletions core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file target/unit-tests.log
test.appender=file,console
log4j.rootCategory=INFO, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
# Tests that launch java subprocesses can set the "test.appender" system property to
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is copied from spark, right? I think everything from here down is not relevant here (and so is confusing)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,I copy it for test only. So, do you mean to cut the 'console output' part?

# "console" to avoid having the child process's logs overwrite the unit test's
# log file.
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=DEBUG
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%t: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark_project.jetty=WARN
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
// (c) Copyright 2018 Cloudera, Inc. All rights reserved.
package com.cloudera.spark

import com.typesafe.scalalogging.Logger
import org.scalatest.FunSuite

class PeakReportingSuite extends FunSuite {

private[this] val log = Logger(this.getClass)

test("increment bytes") {

log.info("================== test IncrementBytes.report start.")

// delta over 1e7, and 5% increase
assert(IncrementBytes.report(1e9.toLong, 1.051e9.toLong))
// delta over 1e7, but less than 5% increase
Expand All @@ -19,5 +25,7 @@ class PeakReportingSuite extends FunSuite {
// increase from small starting point OK
assert(IncrementBytes.report(0, 1.001e7.toLong))

log.info("================== test IncrementBytes.report end.")

}
}
Loading