Skip to content

Commit

Permalink
Add metricsFactory; fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jihoonson committed Jan 8, 2025
1 parent 842c0f3 commit 377bdfc
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 94 deletions.
14 changes: 9 additions & 5 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object GpuExec {

trait GpuExec extends SparkPlan {
import GpuMetric._

def sparkSession: SparkSession = {
SparkShimImpl.sessionFromPlan(this)
}
Expand Down Expand Up @@ -77,20 +78,23 @@ trait GpuExec extends SparkPlan {
*/
def outputBatching: CoalesceGoal = null

@transient private [this] lazy val metricFactory = new GpuMetricFactory(
MetricsLevel(RapidsConf.METRICS_LEVEL.get(conf)), sparkContext)

def createMetric(level: MetricsLevel, name: String): GpuMetric =
GpuMetric.create(level, name)
metricFactory.create(level, name)

def createNanoTimingMetric(level: MetricsLevel, name: String): GpuMetric =
GpuMetric.createNanoTiming(level, name)
metricFactory.createNanoTiming(level, name)

def createSizeMetric(level: MetricsLevel, name: String): GpuMetric =
GpuMetric.createSize(level, name)
metricFactory.createSize(level, name)

def createAverageMetric(level: MetricsLevel, name: String): GpuMetric =
GpuMetric.createAverage(level, name)
metricFactory.createAverage(level, name)

def createTimingMetric(level: MetricsLevel, name: String): GpuMetric =
GpuMetric.createTiming(level, name)
metricFactory.createTiming(level, name)

protected def createFileCacheMetrics(): Map[String, GpuMetric] = {
if (FileCacheConf.FILECACHE_ENABLED.get(conf)) {
Expand Down
74 changes: 30 additions & 44 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
* limitations under the License.
*/

package com.nvidia.spark.rapids;
package com.nvidia.spark.rapids

import scala.collection.immutable.TreeMap

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuTaskMetrics

sealed class MetricsLevel(val num: Integer) extends Serializable {
Expand All @@ -41,7 +40,34 @@ object MetricsLevel {
}
}

object GpuMetric extends Logging with SQLConfHelper {
class GpuMetricFactory(metricsConf: MetricsLevel, context: SparkContext) {

private [this] def createInternal(level: MetricsLevel, f: => SQLMetric): GpuMetric = {
if (level >= metricsConf) {
// only enable companion metrics (excluding semaphore wait time) for DEBUG_LEVEL
WrappedGpuMetric(f, withMetricsExclSemWait = GpuMetric.DEBUG_LEVEL >= metricsConf)
} else {
NoopMetric
}
}

def create(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createMetric(context, name))

def createNanoTiming(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createNanoTimingMetric(context, name))

def createSize(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createSizeMetric(context, name))

def createAverage(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createAverageMetric(context, name))

def createTiming(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createTimingMetric(context, name))
}

object GpuMetric extends Logging {
// Metric names.
val BUFFER_TIME = "bufferTime"
val COPY_BUFFER_TIME = "copyBufferTime"
Expand Down Expand Up @@ -126,46 +152,6 @@ object GpuMetric extends Logging with SQLConfHelper {
val DESCRIPTION_CONCAT_BUFFER_TIME = "concat buffer time"
val DESCRIPTION_COPY_TO_HOST_TIME = "deviceToHost memory copy time"

private final val session = Option(SparkSession.getActiveSession.orNull)

private def sparkContext = {
if (session.isDefined) {
session.get.sparkContext
} else {
throw new IllegalStateException("No active SparkSession")
}
}

override def conf: SQLConf = {
session.map(_.sessionState.conf).getOrElse(super.conf)
}

private [this] lazy val metricsConf = MetricsLevel(RapidsConf.METRICS_LEVEL.get(conf))

private [this] def createInternal(level: MetricsLevel, f: => SQLMetric): GpuMetric = {
if (level >= metricsConf) {
// only enable companion metrics (excluding semaphore wait time) for DEBUG_LEVEL
WrappedGpuMetric(f, withMetricsExclSemWait = GpuMetric.DEBUG_LEVEL >= metricsConf)
} else {
NoopMetric
}
}

def create(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createMetric(sparkContext, name))

def createNanoTiming(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createNanoTimingMetric(sparkContext, name))

def createSize(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createSizeMetric(sparkContext, name))

def createAverage(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createAverageMetric(sparkContext, name))

def createTiming(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createTimingMetric(sparkContext, name))

def unwrap(input: GpuMetric): SQLMetric = input match {
case w :WrappedGpuMetric => w.sqlMetric
case i => throw new IllegalArgumentException(s"found unsupported GpuMetric ${i.getClass}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import java.nio.charset.StandardCharsets

import scala.collection.mutable

import com.nvidia.spark.rapids.GpuMetric
import com.nvidia.spark.rapids.{GpuMetric, GpuMetricFactory, MetricsLevel, RapidsConf}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkContext, TaskContext}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.WriteTaskStats
Expand Down Expand Up @@ -238,18 +238,22 @@ object BasicColumnarWriteJobStatsTracker {
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"

def metrics: Map[String, GpuMetric] = {
val sparkContext = SparkContext.getActive.get
val metricsConf = MetricsLevel(sparkContext.conf.get(RapidsConf.METRICS_LEVEL.key,
RapidsConf.METRICS_LEVEL.defaultValue))
val metricFactory = new GpuMetricFactory(metricsConf, sparkContext)
Map(
NUM_FILES_KEY -> GpuMetric.create(GpuMetric.ESSENTIAL_LEVEL,
NUM_FILES_KEY -> metricFactory.create(GpuMetric.ESSENTIAL_LEVEL,
"number of written files"),
NUM_OUTPUT_BYTES_KEY -> GpuMetric.createSize(GpuMetric.ESSENTIAL_LEVEL,
NUM_OUTPUT_BYTES_KEY -> metricFactory.createSize(GpuMetric.ESSENTIAL_LEVEL,
"written output"),
NUM_OUTPUT_ROWS_KEY -> GpuMetric.create(GpuMetric.ESSENTIAL_LEVEL,
NUM_OUTPUT_ROWS_KEY -> metricFactory.create(GpuMetric.ESSENTIAL_LEVEL,
"number of output rows"),
NUM_PARTS_KEY -> GpuMetric.create(GpuMetric.ESSENTIAL_LEVEL,
NUM_PARTS_KEY -> metricFactory.create(GpuMetric.ESSENTIAL_LEVEL,
"number of dynamic part"),
TASK_COMMIT_TIME -> GpuMetric.createTiming(GpuMetric.ESSENTIAL_LEVEL,
TASK_COMMIT_TIME -> metricFactory.createTiming(GpuMetric.ESSENTIAL_LEVEL,
"task commit time"),
JOB_COMMIT_TIME -> GpuMetric.createTiming(GpuMetric.ESSENTIAL_LEVEL,
JOB_COMMIT_TIME -> metricFactory.createTiming(GpuMetric.ESSENTIAL_LEVEL,
"job commit time")
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package org.apache.spark.sql.rapids

import com.nvidia.spark.rapids.{GpuDataWritingCommand, GpuMetric}
import com.nvidia.spark.rapids.{GpuDataWritingCommand, GpuMetric, GpuMetricFactory, MetricsLevel, RapidsConf}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext

import org.apache.spark.sql.rapids.BasicColumnarWriteJobStatsTracker.TASK_COMMIT_TIME
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -80,18 +81,22 @@ object GpuWriteJobStatsTracker {
def basicMetrics: Map[String, GpuMetric] = BasicColumnarWriteJobStatsTracker.metrics

def taskMetrics: Map[String, GpuMetric] = {
val sparkContext = SparkContext.getActive.get
val metricsConf = MetricsLevel(sparkContext.conf.get(RapidsConf.METRICS_LEVEL.key,
RapidsConf.METRICS_LEVEL.defaultValue))
val metricFactory = new GpuMetricFactory(metricsConf, sparkContext)
Map(
GPU_TIME_KEY -> GpuMetric.createNanoTiming(GpuMetric.ESSENTIAL_LEVEL, "GPU time"),
WRITE_TIME_KEY -> GpuMetric.createNanoTiming(GpuMetric.ESSENTIAL_LEVEL,
GPU_TIME_KEY -> metricFactory.createNanoTiming(GpuMetric.ESSENTIAL_LEVEL, "GPU time"),
WRITE_TIME_KEY -> metricFactory.createNanoTiming(GpuMetric.ESSENTIAL_LEVEL,
"write time"),
TASK_COMMIT_TIME -> basicMetrics(TASK_COMMIT_TIME),
ASYNC_WRITE_TOTAL_THROTTLE_TIME_KEY -> GpuMetric.createNanoTiming(
ASYNC_WRITE_TOTAL_THROTTLE_TIME_KEY -> metricFactory.createNanoTiming(
GpuMetric.DEBUG_LEVEL, "total throttle time"),
ASYNC_WRITE_AVG_THROTTLE_TIME_KEY -> GpuMetric.createNanoTiming(
ASYNC_WRITE_AVG_THROTTLE_TIME_KEY -> metricFactory.createNanoTiming(
GpuMetric.DEBUG_LEVEL, "avg throttle time per async write"),
ASYNC_WRITE_MIN_THROTTLE_TIME_KEY -> GpuMetric.createNanoTiming(
ASYNC_WRITE_MIN_THROTTLE_TIME_KEY -> metricFactory.createNanoTiming(
GpuMetric.DEBUG_LEVEL, "min throttle time per async write"),
ASYNC_WRITE_MAX_THROTTLE_TIME_KEY -> GpuMetric.createNanoTiming(
ASYNC_WRITE_MAX_THROTTLE_TIME_KEY -> metricFactory.createNanoTiming(
GpuMetric.DEBUG_LEVEL, "max throttle time per async write")
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.io.async

import java.util.concurrent.{Callable, CountDownLatch, ExecutionException, Executors, Future, RejectedExecutionException, TimeUnit}

import com.nvidia.spark.rapids.GpuMetric
import com.nvidia.spark.rapids.{GpuMetric, RapidsConf}
import org.apache.hadoop.conf.Configuration
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
Expand All @@ -40,7 +40,7 @@ class ThrottlingExecutorSuite extends AnyFunSuite with BeforeAndAfterEach {
SparkSession.builder
.master("local")
.appName("ThrottlingExecutorSuite")
.config("spark.rapids.sql.metrics.level", "DEBUG")
.config(RapidsConf.METRICS_LEVEL.key, "DEBUG")
.getOrCreate()

val taskMetrics: Map[String, GpuMetric] = GpuWriteJobStatsTracker.taskMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach {
dataSchema,
rangeName,
includeRetry,
Seq.empty,
mockJobDescription.statsTrackers.map(_.newTaskInstance()),
None) {

// this writer (for tests) doesn't do anything and passes through the
Expand Down Expand Up @@ -558,22 +558,26 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach {
resetMocksWithAndWithoutRetry {
val cb = buildBatchWithPartitionedCol(1, 1, 1, 1, 1, 1, 1, 1, 1)
val cbs = Seq(spy(cb))
withColumnarBatchesVerifyClosed(cbs) {
// I would like to not flush on the first iteration of the `write` method
when(mockJobDescription.concurrentWriterPartitionFlushSize).thenReturn(1000)
when(mockJobDescription.maxRecordsPerFile).thenReturn(9)

val statsTracker = mock[ColumnarWriteTaskStatsTracker]
val jobTracker = new ColumnarWriteJobStatsTracker {
override def newTaskInstance(): ColumnarWriteTaskStatsTracker = {
statsTracker
}
override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = {}
// Mock jobDescription before it is passed to the mockOutputWriter in
// withColumnarBatchesVerifyClosed().

// I would like to not flush on the first iteration of the `write` method
when(mockJobDescription.concurrentWriterPartitionFlushSize).thenReturn(1000)
when(mockJobDescription.maxRecordsPerFile).thenReturn(9)

val statsTracker = mock[ColumnarWriteTaskStatsTracker]
val jobTracker = new ColumnarWriteJobStatsTracker {
override def newTaskInstance(): ColumnarWriteTaskStatsTracker = {
statsTracker
}
when(mockJobDescription.statsTrackers)
.thenReturn(Seq(jobTracker))
override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = {}
}
when(mockJobDescription.statsTrackers)
.thenReturn(Seq(jobTracker))

// throw once from bufferBatchAndClose to simulate an exception after we call the
withColumnarBatchesVerifyClosed(cbs) {
// throw once from bufferBatchAndClose to simulate an exception after we call the
// stats tracker
mockOutputWriter.throwOnNextBufferBatchAndClose(
new GpuSplitAndRetryOOM("mocking a split and retry"))
Expand Down Expand Up @@ -615,23 +619,28 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach {
resetMocksWithAndWithoutRetry {
val cb = buildBatchWithPartitionedCol(1, 1, 1, 1, 1, 1, 1, 1, 1)
val cbs = Seq(spy(cb))
withColumnarBatchesVerifyClosed(cbs) {
// I would like to not flush on the first iteration of the `write` method
when(mockJobDescription.concurrentWriterPartitionFlushSize).thenReturn(1000)
when(mockJobDescription.maxRecordsPerFile).thenReturn(9)

val statsTracker = mock[ColumnarWriteTaskStatsTracker]
val jobTracker = new ColumnarWriteJobStatsTracker {
override def newTaskInstance(): ColumnarWriteTaskStatsTracker = {
statsTracker
}
// Mock jobDescription before mockOutputWriter is initialized in
// withColumnarBatchesVerifyClosed().

// I would like to not flush on the first iteration of the `write` method
when(mockJobDescription.concurrentWriterPartitionFlushSize).thenReturn(1000)
when(mockJobDescription.maxRecordsPerFile).thenReturn(9)

override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = {}
val statsTracker = mock[ColumnarWriteTaskStatsTracker]
val jobTracker = new ColumnarWriteJobStatsTracker {
override def newTaskInstance(): ColumnarWriteTaskStatsTracker = {
statsTracker
}
when(mockJobDescription.statsTrackers)
.thenReturn(Seq(jobTracker))
when(statsTracker.newBatch(any(), any()))
.thenThrow(new GpuRetryOOM("mocking a retry"))

override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = {}
}
when(mockJobDescription.statsTrackers)
.thenReturn(Seq(jobTracker))
when(statsTracker.newBatch(any(), any()))
.thenThrow(new GpuRetryOOM("mocking a retry"))

withColumnarBatchesVerifyClosed(cbs) {
val dynamicConcurrentWriter =
prepareDynamicPartitionConcurrentWriter(maxWriters = 5, batchSize = 1)

Expand Down

0 comments on commit 377bdfc

Please sign in to comment.