diff --git a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala index 9a85c11..deb7268 100644 --- a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala +++ b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala @@ -2,21 +2,24 @@ package com.cloudera.spark import java.lang.management._ -import java.math.{RoundingMode, MathContext} +import java.math.{MathContext, RoundingMode} import java.text.SimpleDateFormat import java.util.Locale -import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean, AtomicReference} import java.util.concurrent._ - -import scala.collection.JavaConverters._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} import com.quantifind.sumac.FieldArgs - -import org.apache.spark.{TaskContext, SparkContext} +import com.typesafe.scalalogging.Logger import org.apache.spark.executor.ExecutorPlugin import org.apache.spark.memory.SparkMemoryManagerHandle +import org.apache.spark.{SparkContext, TaskContext} + +import scala.collection.JavaConverters._ class MemoryMonitor(val args: MemoryMonitorArgs) { + + private[this] val log = Logger(this.getClass) + val nettyMemoryHandle = SparkNettyMemoryHandle.get() val sparkMemManagerHandle = SparkMemoryManagerHandle.get() val memoryBean = ManagementFactory.getMemoryMXBean @@ -59,8 +62,8 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { val inShutdown = new AtomicBoolean(false) def showMetricNames: Unit = { - println(s"${nMetrics} Metrics") - (0 until nMetrics).foreach { idx => println(names(idx))} + log.info(s"${nMetrics} Metrics") + (0 until nMetrics).foreach { idx => log.info(names(idx))} } def collectSnapshot: MemorySnapshot = { @@ -81,41 +84,39 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { } def showSnapshot(mem: MemorySnapshot): Unit = { - println(s"Mem usage at ${MemoryMonitor.dateFormat.format(mem.time)}") - println("===============") + log.info(s"Mem usage at ${MemoryMonitor.dateFormat.format(mem.time)}") + log.info("===============") // TODO headers for each getter? (0 until nMetrics).foreach { idx => val v = mem.values(idx) - println(names(idx) + "\t:" + MemoryMonitor.bytesToString(v) + "(" + v + ")") + log.info(names(idx) + "\t:" + MemoryMonitor.bytesToString(v) + "(" + v + ")") } - println() - println() + log.info("") + log.info("") } def updateAndMaybeShowPeaks(): Unit = { val snapshot = collectSnapshot - if (peakMemoryUsage.update(snapshot, peakUpdates, reporting)) { - showUpdates(snapshot.time, peakMemoryUsage, peakUpdates) - } + if (peakMemoryUsage.update(snapshot, peakUpdates, reporting)) showUpdates(snapshot.time, peakMemoryUsage, peakUpdates) } def showUpdates(time: Long, peakMemory: MemoryPeaks, updates: PeakUpdate): Unit = { - println(s"Peak Memory updates:${MemoryMonitor.dateFormat.format(time)}") + log.info(s"Peak Memory updates:${MemoryMonitor.dateFormat.format(time)}") (0 until updates.nUpdates).foreach { updateIdx => val metricIdx = updates.updateIdx(updateIdx) val name = names(metricIdx) val currentVal = MemoryMonitor.bytesToString(peakMemoryUsage.values(metricIdx)) val rawDelta = updates.delta(updateIdx) val delta = (if (rawDelta > 0) "+" else "-") + MemoryMonitor.bytesToString(rawDelta) - println(s"$name\t:\t$currentVal ($delta)") + log.info(s"$name\t:\t$currentVal ($delta)") } } def showPeaks(time: Long): Unit = { - println(s"Peak Memory usage so far ${MemoryMonitor.dateFormat.format(time)}") + log.info(s"Peak Memory usage so far ${MemoryMonitor.dateFormat.format(time)}") // TODO headers for each getter? (0 until nMetrics).foreach { idx => - println(names(idx) + "\t:" + MemoryMonitor.bytesToString(peakMemoryUsage.values(idx)) + + log.info(names(idx) + "\t:" + MemoryMonitor.bytesToString(peakMemoryUsage.values(idx)) + "\t\t\t\t" + MemoryMonitor.dateFormat.format(peakMemoryUsage.peakTimes(idx))) } } @@ -127,7 +128,7 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { def showLastThreadDump: Unit = { val threads = lastThreadDump.get() if (threads != null) { - println("last thread dump:") + log.info("last thread dump:") MemoryMonitor.showThreadDump(threads) } } @@ -136,14 +137,14 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { Runtime.getRuntime.addShutdownHook(new Thread(){ override def run(): Unit = { inShutdown.set(true) - println() - println("IN SHUTDOWN") - println("================") + log.info("") + log.info("IN SHUTDOWN") + log.info("================") val snapshot = collectSnapshot showSnapshot(snapshot) peakMemoryUsage.update(snapshot, peakUpdates, reporting) showPeaks(snapshot.time) - println("Last non-shutdown snapshot:") + log.info("Last non-shutdown snapshot:") showSnapshot(lastNonShutdownSnapshot.get()) showLastThreadDump @@ -154,25 +155,25 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { def beanInfo(): Unit = { memMgrBeans.foreach { mgr => - println(mgr.getName + " is managing " + mgr.getMemoryPoolNames.mkString(",")) + log.info(mgr.getName + " is managing " + mgr.getMemoryPoolNames.mkString(",")) } poolBeans.foreach { pool => - println(pool.getName()) - println("============") - println(pool.getName() + " is managed by " + pool.getMemoryManagerNames.mkString(",")) + log.info(pool.getName()) + log.info("============") + log.info(pool.getName() + " is managed by " + pool.getMemoryManagerNames.mkString(",")) if (pool.isUsageThresholdSupported) - println("supports usage threshold") + log.info("supports usage threshold") if (pool.isCollectionUsageThresholdSupported) - println("supports collection usage threshold") + log.info("supports collection usage threshold") pool.getUsage - println() - println() + log.info("") + log.info("") } - println("BUFFER POOLS") + log.info("BUFFER POOLS") bufferPoolsBeans.foreach { bp => - println(s"${bp.getName}: ${bp.getMemoryUsed} / ${bp.getTotalCapacity}") + log.info(s"${bp.getName}: ${bp.getMemoryUsed} / ${bp.getTotalCapacity}") } } } @@ -180,6 +181,7 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { object MemoryMonitor { val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS") + private[this] val log = Logger(this.getClass) private var monitor: MemoryMonitor = null private var shutdownHookInstalled = false @@ -222,16 +224,16 @@ object MemoryMonitor { def listAllMBeans: Unit = { val server = ManagementFactory.getPlatformMBeanServer val allBeans = server.queryNames(null, null) - println("ALL BEANS") - println("=============") - allBeans.asScala.map{_.toString}.toArray.sorted.foreach { ob => println(ob) } - println() - println() + log.info("ALL BEANS") + log.info("=============") + allBeans.asScala.map{_.toString}.toArray.sorted.foreach { ob => log.info(ob) } + log.info("") + log.info("") } def showLimits: Unit = { - println("sun.misc.VM.maxDirectMemory(): " + sun.misc.VM.maxDirectMemory()) - println("Runtime.getRuntime.maxMemory(): " + Runtime.getRuntime.maxMemory()) + log.info("sun.misc.VM.maxDirectMemory(): " + sun.misc.VM.maxDirectMemory()) + log.info("Runtime.getRuntime.maxMemory(): " + Runtime.getRuntime.maxMemory()) } def installIfSysProps(): Unit = { @@ -240,9 +242,9 @@ object MemoryMonitor { install(args) installShutdownHook() args.freq.foreach { freq => - println(s"POLLING memory monitor every $freq millis") + log.info(s"POLLING memory monitor every $freq millis") monitor.showCurrentMemUsage - println("done with initial show") + log.info("done with initial show") startPolling(args) } } @@ -252,7 +254,7 @@ object MemoryMonitor { if (!SparkMemoryManagerHandle.isDynamicAllocation(sc)) { installOnExecutors(sc) } else { - println ("********* WARNING ***** not installing on executors because of DA") + log.info ("********* WARNING ***** not installing on executors because of DA") } } @@ -263,7 +265,7 @@ object MemoryMonitor { } else { numTasks } - println(s"Running $t tasks to install memory monitor on executors") + log.info(s"Running $t tasks to install memory monitor on executors") sc.parallelize(1 to t, t).foreach { _ => Thread.sleep(sleep) installIfSysProps() @@ -317,17 +319,19 @@ object MemoryMonitor { def showThreadDump(threads: Array[ThreadInfo]): Unit = { threads.foreach { t => if (t == null) { - println("") + log.info("") } else { - println(t.getThreadId + " " + t.getThreadName + " " + t.getThreadState) - t.getStackTrace.foreach { elem => println("\t" + elem) } - println() + log.info(t.getThreadId + " " + t.getThreadName + " " + t.getThreadState) + t.getStackTrace.foreach { elem => log.info("\t" + elem) } + log.info("") } } } } class MemoryMonitorExecutorExtension extends ExecutorPlugin { + private[this] val log = Logger(this.getClass) + // the "extension class" api just lets you invoke a constructor. We really just want to // call this static method, so that's good enough. MemoryMonitor.installIfSysProps() @@ -356,7 +360,7 @@ class MemoryMonitorExecutorExtension extends ExecutorPlugin { val task = scheduler.scheduleWithFixedDelay(new Runnable { override def run(): Unit = { val d = MemoryMonitor.dateFormat.format(System.currentTimeMillis()) - println(s"Polled thread dump @ $d") + log.info(s"Polled thread dump @ $d") MemoryMonitor.showThreadDump(MemoryMonitor.getThreadInfo) } }, 0, args.threadDumpFreqMillis, TimeUnit.MILLISECONDS) @@ -397,6 +401,7 @@ class MemoryMonitorArgs extends FieldArgs { } object MemoryMonitorArgs { + private[this] val log = Logger(this.getClass) val prefix = "memory.monitor." val prefixLen = prefix.length @@ -406,7 +411,7 @@ object MemoryMonitorArgs { k.substring(prefixLen) -> v }) if (args.stagesToPoll != null && args.stagesToPoll.nonEmpty) { - System.out.println(s"will poll thread dumps for stages ${args.stagesToPoll.mkString(",")}") + log.info(s"will poll thread dumps for stages ${args.stagesToPoll.mkString(",")}") } else { args.stagesToPoll = Array() } diff --git a/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala b/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala index 387d7a8..ecd079e 100644 --- a/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala +++ b/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala @@ -1,7 +1,6 @@ package org.apache.spark.memory -import com.cloudera.spark.{Reflector, IncrementBytes, MemoryGetter} -import org.apache.spark.util.{Utils, ThreadStackTrace} +import com.cloudera.spark.{IncrementBytes, MemoryGetter, Reflector} import org.apache.spark.{SparkContext, SparkEnv} class SparkMemoryManagerHandle( diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties new file mode 100644 index 0000000..57c87e7 --- /dev/null +++ b/core/src/test/resources/log4j.properties @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the file target/unit-tests.log +test.appender=file,console +log4j.rootCategory=INFO, ${test.appender} +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n +# Tests that launch java subprocesses can set the "test.appender" system property to +# "console" to avoid having the child process's logs overwrite the unit test's +# log file. +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=DEBUG +log4j.appender.console.ImmediateFlush=true +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%t: %m%n +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark_project.jetty=WARN diff --git a/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala b/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala index 053da98..7b3245a 100644 --- a/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala +++ b/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala @@ -1,11 +1,17 @@ // (c) Copyright 2018 Cloudera, Inc. All rights reserved. package com.cloudera.spark +import com.typesafe.scalalogging.Logger import org.scalatest.FunSuite class PeakReportingSuite extends FunSuite { + private[this] val log = Logger(this.getClass) + test("increment bytes") { + + log.info("================== test IncrementBytes.report start.") + // delta over 1e7, and 5% increase assert(IncrementBytes.report(1e9.toLong, 1.051e9.toLong)) // delta over 1e7, but less than 5% increase @@ -19,5 +25,7 @@ class PeakReportingSuite extends FunSuite { // increase from small starting point OK assert(IncrementBytes.report(0, 1.001e7.toLong)) + log.info("================== test IncrementBytes.report end.") + } } diff --git a/pom.xml b/pom.xml index 019475d..57ec029 100644 --- a/pom.xml +++ b/pom.xml @@ -1,10 +1,9 @@ + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 - com.cloudera spark-memory-parent_2.11 0.1.0-SNAPSHOT @@ -27,6 +26,10 @@ 64m 512m 512m + + 3.9.0 + 1.7.25 + 1.2.17 @@ -46,7 +49,7 @@ org.apache.spark spark-core_2.11 - 2.3.0 + 2.3.2 provided @@ -56,6 +59,35 @@ 3.0.5 test + + + + com.typesafe.scala-logging + scala-logging_2.11 + ${scala-logging.version} + + + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + provided + + + log4j + log4j + ${log4j.version} + provided + + + @@ -89,7 +121,7 @@ ${java.version} - + scala-compile-first process-resources @@ -104,8 +136,8 @@ testCompile - - + + org.apache.maven.plugins maven-compiler-plugin @@ -126,21 +158,51 @@ scala-maven-plugin - maven-assembly-plugin - - - jar-with-dependencies - - - - - make-assembly - package - - single - - - + maven-assembly-plugin + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + add-test-source + generate-sources + + add-test-source + + + + src/test/scala + + + +