From cde6820cd79c025e65f520eb8cd3dbb064b31ac4 Mon Sep 17 00:00:00 2001 From: "jackson.hou" Date: Wed, 23 Jan 2019 22:41:03 +0800 Subject: [PATCH 1/2] add logback support --- .../com/cloudera/spark/MemoryMonitor.scala | 109 +++--- .../memory/SparkMemoryManagerHandle.scala | 3 +- pom.xml | 335 +++++++++++------- 3 files changed, 258 insertions(+), 189 deletions(-) diff --git a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala index 9a85c11..deb7268 100644 --- a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala +++ b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala @@ -2,21 +2,24 @@ package com.cloudera.spark import java.lang.management._ -import java.math.{RoundingMode, MathContext} +import java.math.{MathContext, RoundingMode} import java.text.SimpleDateFormat import java.util.Locale -import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean, AtomicReference} import java.util.concurrent._ - -import scala.collection.JavaConverters._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} import com.quantifind.sumac.FieldArgs - -import org.apache.spark.{TaskContext, SparkContext} +import com.typesafe.scalalogging.Logger import org.apache.spark.executor.ExecutorPlugin import org.apache.spark.memory.SparkMemoryManagerHandle +import org.apache.spark.{SparkContext, TaskContext} + +import scala.collection.JavaConverters._ class MemoryMonitor(val args: MemoryMonitorArgs) { + + private[this] val log = Logger(this.getClass) + val nettyMemoryHandle = SparkNettyMemoryHandle.get() val sparkMemManagerHandle = SparkMemoryManagerHandle.get() val memoryBean = ManagementFactory.getMemoryMXBean @@ -59,8 +62,8 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { val inShutdown = new AtomicBoolean(false) def showMetricNames: Unit = { - println(s"${nMetrics} Metrics") - (0 until nMetrics).foreach { idx => println(names(idx))} + log.info(s"${nMetrics} Metrics") + (0 until nMetrics).foreach { idx => log.info(names(idx))} } def collectSnapshot: MemorySnapshot = { @@ -81,41 +84,39 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { } def showSnapshot(mem: MemorySnapshot): Unit = { - println(s"Mem usage at ${MemoryMonitor.dateFormat.format(mem.time)}") - println("===============") + log.info(s"Mem usage at ${MemoryMonitor.dateFormat.format(mem.time)}") + log.info("===============") // TODO headers for each getter? (0 until nMetrics).foreach { idx => val v = mem.values(idx) - println(names(idx) + "\t:" + MemoryMonitor.bytesToString(v) + "(" + v + ")") + log.info(names(idx) + "\t:" + MemoryMonitor.bytesToString(v) + "(" + v + ")") } - println() - println() + log.info("") + log.info("") } def updateAndMaybeShowPeaks(): Unit = { val snapshot = collectSnapshot - if (peakMemoryUsage.update(snapshot, peakUpdates, reporting)) { - showUpdates(snapshot.time, peakMemoryUsage, peakUpdates) - } + if (peakMemoryUsage.update(snapshot, peakUpdates, reporting)) showUpdates(snapshot.time, peakMemoryUsage, peakUpdates) } def showUpdates(time: Long, peakMemory: MemoryPeaks, updates: PeakUpdate): Unit = { - println(s"Peak Memory updates:${MemoryMonitor.dateFormat.format(time)}") + log.info(s"Peak Memory updates:${MemoryMonitor.dateFormat.format(time)}") (0 until updates.nUpdates).foreach { updateIdx => val metricIdx = updates.updateIdx(updateIdx) val name = names(metricIdx) val currentVal = MemoryMonitor.bytesToString(peakMemoryUsage.values(metricIdx)) val rawDelta = updates.delta(updateIdx) val delta = (if (rawDelta > 0) "+" else "-") + MemoryMonitor.bytesToString(rawDelta) - println(s"$name\t:\t$currentVal ($delta)") + log.info(s"$name\t:\t$currentVal ($delta)") } } def showPeaks(time: Long): Unit = { - println(s"Peak Memory usage so far ${MemoryMonitor.dateFormat.format(time)}") + log.info(s"Peak Memory usage so far ${MemoryMonitor.dateFormat.format(time)}") // TODO headers for each getter? (0 until nMetrics).foreach { idx => - println(names(idx) + "\t:" + MemoryMonitor.bytesToString(peakMemoryUsage.values(idx)) + + log.info(names(idx) + "\t:" + MemoryMonitor.bytesToString(peakMemoryUsage.values(idx)) + "\t\t\t\t" + MemoryMonitor.dateFormat.format(peakMemoryUsage.peakTimes(idx))) } } @@ -127,7 +128,7 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { def showLastThreadDump: Unit = { val threads = lastThreadDump.get() if (threads != null) { - println("last thread dump:") + log.info("last thread dump:") MemoryMonitor.showThreadDump(threads) } } @@ -136,14 +137,14 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { Runtime.getRuntime.addShutdownHook(new Thread(){ override def run(): Unit = { inShutdown.set(true) - println() - println("IN SHUTDOWN") - println("================") + log.info("") + log.info("IN SHUTDOWN") + log.info("================") val snapshot = collectSnapshot showSnapshot(snapshot) peakMemoryUsage.update(snapshot, peakUpdates, reporting) showPeaks(snapshot.time) - println("Last non-shutdown snapshot:") + log.info("Last non-shutdown snapshot:") showSnapshot(lastNonShutdownSnapshot.get()) showLastThreadDump @@ -154,25 +155,25 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { def beanInfo(): Unit = { memMgrBeans.foreach { mgr => - println(mgr.getName + " is managing " + mgr.getMemoryPoolNames.mkString(",")) + log.info(mgr.getName + " is managing " + mgr.getMemoryPoolNames.mkString(",")) } poolBeans.foreach { pool => - println(pool.getName()) - println("============") - println(pool.getName() + " is managed by " + pool.getMemoryManagerNames.mkString(",")) + log.info(pool.getName()) + log.info("============") + log.info(pool.getName() + " is managed by " + pool.getMemoryManagerNames.mkString(",")) if (pool.isUsageThresholdSupported) - println("supports usage threshold") + log.info("supports usage threshold") if (pool.isCollectionUsageThresholdSupported) - println("supports collection usage threshold") + log.info("supports collection usage threshold") pool.getUsage - println() - println() + log.info("") + log.info("") } - println("BUFFER POOLS") + log.info("BUFFER POOLS") bufferPoolsBeans.foreach { bp => - println(s"${bp.getName}: ${bp.getMemoryUsed} / ${bp.getTotalCapacity}") + log.info(s"${bp.getName}: ${bp.getMemoryUsed} / ${bp.getTotalCapacity}") } } } @@ -180,6 +181,7 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { object MemoryMonitor { val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS") + private[this] val log = Logger(this.getClass) private var monitor: MemoryMonitor = null private var shutdownHookInstalled = false @@ -222,16 +224,16 @@ object MemoryMonitor { def listAllMBeans: Unit = { val server = ManagementFactory.getPlatformMBeanServer val allBeans = server.queryNames(null, null) - println("ALL BEANS") - println("=============") - allBeans.asScala.map{_.toString}.toArray.sorted.foreach { ob => println(ob) } - println() - println() + log.info("ALL BEANS") + log.info("=============") + allBeans.asScala.map{_.toString}.toArray.sorted.foreach { ob => log.info(ob) } + log.info("") + log.info("") } def showLimits: Unit = { - println("sun.misc.VM.maxDirectMemory(): " + sun.misc.VM.maxDirectMemory()) - println("Runtime.getRuntime.maxMemory(): " + Runtime.getRuntime.maxMemory()) + log.info("sun.misc.VM.maxDirectMemory(): " + sun.misc.VM.maxDirectMemory()) + log.info("Runtime.getRuntime.maxMemory(): " + Runtime.getRuntime.maxMemory()) } def installIfSysProps(): Unit = { @@ -240,9 +242,9 @@ object MemoryMonitor { install(args) installShutdownHook() args.freq.foreach { freq => - println(s"POLLING memory monitor every $freq millis") + log.info(s"POLLING memory monitor every $freq millis") monitor.showCurrentMemUsage - println("done with initial show") + log.info("done with initial show") startPolling(args) } } @@ -252,7 +254,7 @@ object MemoryMonitor { if (!SparkMemoryManagerHandle.isDynamicAllocation(sc)) { installOnExecutors(sc) } else { - println ("********* WARNING ***** not installing on executors because of DA") + log.info ("********* WARNING ***** not installing on executors because of DA") } } @@ -263,7 +265,7 @@ object MemoryMonitor { } else { numTasks } - println(s"Running $t tasks to install memory monitor on executors") + log.info(s"Running $t tasks to install memory monitor on executors") sc.parallelize(1 to t, t).foreach { _ => Thread.sleep(sleep) installIfSysProps() @@ -317,17 +319,19 @@ object MemoryMonitor { def showThreadDump(threads: Array[ThreadInfo]): Unit = { threads.foreach { t => if (t == null) { - println("") + log.info("") } else { - println(t.getThreadId + " " + t.getThreadName + " " + t.getThreadState) - t.getStackTrace.foreach { elem => println("\t" + elem) } - println() + log.info(t.getThreadId + " " + t.getThreadName + " " + t.getThreadState) + t.getStackTrace.foreach { elem => log.info("\t" + elem) } + log.info("") } } } } class MemoryMonitorExecutorExtension extends ExecutorPlugin { + private[this] val log = Logger(this.getClass) + // the "extension class" api just lets you invoke a constructor. We really just want to // call this static method, so that's good enough. MemoryMonitor.installIfSysProps() @@ -356,7 +360,7 @@ class MemoryMonitorExecutorExtension extends ExecutorPlugin { val task = scheduler.scheduleWithFixedDelay(new Runnable { override def run(): Unit = { val d = MemoryMonitor.dateFormat.format(System.currentTimeMillis()) - println(s"Polled thread dump @ $d") + log.info(s"Polled thread dump @ $d") MemoryMonitor.showThreadDump(MemoryMonitor.getThreadInfo) } }, 0, args.threadDumpFreqMillis, TimeUnit.MILLISECONDS) @@ -397,6 +401,7 @@ class MemoryMonitorArgs extends FieldArgs { } object MemoryMonitorArgs { + private[this] val log = Logger(this.getClass) val prefix = "memory.monitor." val prefixLen = prefix.length @@ -406,7 +411,7 @@ object MemoryMonitorArgs { k.substring(prefixLen) -> v }) if (args.stagesToPoll != null && args.stagesToPoll.nonEmpty) { - System.out.println(s"will poll thread dumps for stages ${args.stagesToPoll.mkString(",")}") + log.info(s"will poll thread dumps for stages ${args.stagesToPoll.mkString(",")}") } else { args.stagesToPoll = Array() } diff --git a/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala b/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala index 387d7a8..ecd079e 100644 --- a/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala +++ b/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala @@ -1,7 +1,6 @@ package org.apache.spark.memory -import com.cloudera.spark.{Reflector, IncrementBytes, MemoryGetter} -import org.apache.spark.util.{Utils, ThreadStackTrace} +import com.cloudera.spark.{IncrementBytes, MemoryGetter, Reflector} import org.apache.spark.{SparkContext, SparkEnv} class SparkMemoryManagerHandle( diff --git a/pom.xml b/pom.xml index 019475d..a388875 100644 --- a/pom.xml +++ b/pom.xml @@ -1,147 +1,212 @@ - 4.0.0 + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + 4.0.0 + com.cloudera + spark-memory-parent_2.11 + 0.1.0-SNAPSHOT + pom - com.cloudera - spark-memory-parent_2.11 - 0.1.0-SNAPSHOT - pom + + + cloudera + https://repository.cloudera.com/artifactory/cloudera-repos/ + + - - - cloudera - https://repository.cloudera.com/artifactory/cloudera-repos/ - - + + core + - - core - + + 2.11.8 + 1.8 + 64m + 512m + 512m - - 2.11.8 - 1.8 - 64m - 512m - 512m - + 1.7.25 + 1.2.3 + - - - - com.quantifind - sumac_2.11 - 0.3.0 - + + + + com.quantifind + sumac_2.11 + 0.3.0 + - - org.scala-lang - scala-library - ${scala.version} - provided - - - org.apache.spark - spark-core_2.11 - 2.3.0 - provided - + + org.scala-lang + scala-library + ${scala.version} + provided + + + org.apache.spark + spark-core_2.11 + 2.3.2 + provided + - - org.scalatest - scalatest_2.11 - 3.0.5 - test - - + + org.scalatest + scalatest_2.11 + 3.0.5 + test + + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + + + org.slf4j + jcl-over-slf4j + ${slf4j.version} + provided + + + + ch.qos.logback + logback-classic + ${logback.version} + provided + + + ch.qos.logback + logback-core + ${logback.version} + provided + - - - - - net.alchim31.maven - scala-maven-plugin - 3.2.0 - - ${scala.version} - incremental - true - - -unchecked - -deprecation - -feature - - - -Xms1024m - -Xmx1024m - -XX:PermSize=${PermGen} - -XX:MaxPermSize=${MaxPermGen} - -XX:ReservedCodeCacheSize=${CodeCacheSize} - - - -source - ${java.version} - -target - ${java.version} - - - - - scala-compile-first - process-resources - - compile - - - - scala-test-compile-first - process-test-resources - - testCompile - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.3 - - ${java.version} - ${java.version} - UTF-8 - 1024m - true - - - - - - - net.alchim31.maven - scala-maven-plugin - - - maven-assembly-plugin - - - jar-with-dependencies - - - - - make-assembly - package - - single - - - - - - + + com.typesafe.scala-logging + scala-logging_2.11 + 3.9.0 + + + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.0 + + ${scala.version} + incremental + true + + -unchecked + -deprecation + -feature + + + -Xms1024m + -Xmx1024m + -XX:PermSize=${PermGen} + -XX:MaxPermSize=${MaxPermGen} + -XX:ReservedCodeCacheSize=${CodeCacheSize} + + + -source + ${java.version} + -target + ${java.version} + + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + ${java.version} + ${java.version} + UTF-8 + 1024m + true + + + + + + + net.alchim31.maven + scala-maven-plugin + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + add-test-source + generate-sources + + add-test-source + + + + src/test/scala + + + + + + + From ae36c71409a8cbe14e81936c26616dcb806cb9d9 Mon Sep 17 00:00:00 2001 From: "jackson.hou" Date: Thu, 24 Jan 2019 13:38:36 +0800 Subject: [PATCH 2/2] change to use log4j for compatibility --- core/src/test/resources/log4j.properties | 35 ++ .../cloudera/spark/PeakReportingSuite.scala | 8 + pom.xml | 387 +++++++++--------- 3 files changed, 235 insertions(+), 195 deletions(-) create mode 100644 core/src/test/resources/log4j.properties diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties new file mode 100644 index 0000000..57c87e7 --- /dev/null +++ b/core/src/test/resources/log4j.properties @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the file target/unit-tests.log +test.appender=file,console +log4j.rootCategory=INFO, ${test.appender} +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n +# Tests that launch java subprocesses can set the "test.appender" system property to +# "console" to avoid having the child process's logs overwrite the unit test's +# log file. +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=DEBUG +log4j.appender.console.ImmediateFlush=true +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%t: %m%n +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark_project.jetty=WARN diff --git a/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala b/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala index 053da98..7b3245a 100644 --- a/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala +++ b/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala @@ -1,11 +1,17 @@ // (c) Copyright 2018 Cloudera, Inc. All rights reserved. package com.cloudera.spark +import com.typesafe.scalalogging.Logger import org.scalatest.FunSuite class PeakReportingSuite extends FunSuite { + private[this] val log = Logger(this.getClass) + test("increment bytes") { + + log.info("================== test IncrementBytes.report start.") + // delta over 1e7, and 5% increase assert(IncrementBytes.report(1e9.toLong, 1.051e9.toLong)) // delta over 1e7, but less than 5% increase @@ -19,5 +25,7 @@ class PeakReportingSuite extends FunSuite { // increase from small starting point OK assert(IncrementBytes.report(0, 1.001e7.toLong)) + log.info("================== test IncrementBytes.report end.") + } } diff --git a/pom.xml b/pom.xml index a388875..57ec029 100644 --- a/pom.xml +++ b/pom.xml @@ -2,211 +2,208 @@ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - 4.0.0 + 4.0.0 - com.cloudera - spark-memory-parent_2.11 - 0.1.0-SNAPSHOT - pom + com.cloudera + spark-memory-parent_2.11 + 0.1.0-SNAPSHOT + pom - - - cloudera - https://repository.cloudera.com/artifactory/cloudera-repos/ - - + + + cloudera + https://repository.cloudera.com/artifactory/cloudera-repos/ + + - - core - + + core + - - 2.11.8 - 1.8 - 64m - 512m - 512m + + 2.11.8 + 1.8 + 64m + 512m + 512m - 1.7.25 - 1.2.3 - + 3.9.0 + 1.7.25 + 1.2.17 + - - - - com.quantifind - sumac_2.11 - 0.3.0 - + + + + com.quantifind + sumac_2.11 + 0.3.0 + - - org.scala-lang - scala-library - ${scala.version} - provided - - - org.apache.spark - spark-core_2.11 - 2.3.2 - provided - + + org.scala-lang + scala-library + ${scala.version} + provided + + + org.apache.spark + spark-core_2.11 + 2.3.2 + provided + - - org.scalatest - scalatest_2.11 - 3.0.5 - test - + + org.scalatest + scalatest_2.11 + 3.0.5 + test + - - - org.slf4j - slf4j-api - ${slf4j.version} - provided - - - org.slf4j - jcl-over-slf4j - ${slf4j.version} - provided - - - - ch.qos.logback - logback-classic - ${logback.version} - provided - - - ch.qos.logback - logback-core - ${logback.version} - provided - + + + com.typesafe.scala-logging + scala-logging_2.11 + ${scala-logging.version} + - - com.typesafe.scala-logging - scala-logging_2.11 - 3.9.0 - - + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + provided + + + log4j + log4j + ${log4j.version} + provided + - - - - - net.alchim31.maven - scala-maven-plugin - 3.2.0 - - ${scala.version} - incremental - true - - -unchecked - -deprecation - -feature - - - -Xms1024m - -Xmx1024m - -XX:PermSize=${PermGen} - -XX:MaxPermSize=${MaxPermGen} - -XX:ReservedCodeCacheSize=${CodeCacheSize} - - - -source - ${java.version} - -target - ${java.version} - - - - - scala-compile-first - process-resources - - compile - - - - scala-test-compile-first - process-test-resources - - testCompile - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.3 - - ${java.version} - ${java.version} - UTF-8 - 1024m - true - - - - - - - net.alchim31.maven - scala-maven-plugin - - - maven-assembly-plugin - - - jar-with-dependencies - - - - - make-assembly - package - - single - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-source - generate-sources - - add-source - - - - src/main/scala - - - - - add-test-source - generate-sources - - add-test-source - - - - src/test/scala - - - - - - - + + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.0 + + ${scala.version} + incremental + true + + -unchecked + -deprecation + -feature + + + -Xms1024m + -Xmx1024m + -XX:PermSize=${PermGen} + -XX:MaxPermSize=${MaxPermGen} + -XX:ReservedCodeCacheSize=${CodeCacheSize} + + + -source + ${java.version} + -target + ${java.version} + + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + ${java.version} + ${java.version} + UTF-8 + 1024m + true + + + + + + + net.alchim31.maven + scala-maven-plugin + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + add-test-source + generate-sources + + add-test-source + + + + src/test/scala + + + + + + +