Skip to content

Commit

Permalink
Improve task gpu config and update unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Jan 27, 2025
1 parent edb306a commit 642e463
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool
import scala.annotation.tailrec

import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.tuning.ClusterProperties
import com.nvidia.spark.rapids.tool.tuning.{ClusterProperties, ProfilingAutoTunerConfigsProvider}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.{ExistingClusterInfo, RecommendedClusterInfo}
Expand Down Expand Up @@ -147,7 +147,7 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
var recommendedClusterInfo: Option[RecommendedClusterInfo] = None

// Default recommendation based on NDS benchmarks (note: this could be platform specific)
def recommendedCoresPerExec = 16
def recommendedCoresPerExec: Int = ProfilingAutoTunerConfigsProvider.DEF_CORES_PER_EXECUTOR
// Default number of GPUs to use, currently we do not support multiple GPUs per node
def recommendedGpusPerNode = 1
def defaultNumGpus: Int = 1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -461,15 +461,6 @@ class AutoTuner(
Math.max(1, executorCores)
}

/**
* Recommendation for 'spark.task.resource.gpu.amount' based on num of cpu cores.
*/
def calcTaskGPUAmount: Double = {
val numExecutorCores = calcNumExecutorCores
// can never be 0 since numExecutorCores has to be at least 1
1.0 / numExecutorCores
}

/**
* Recommendation for 'spark.rapids.sql.concurrentGpuTasks' based on gpu memory.
* Assumption - cluster properties were updated to have a default values if missing.
Expand Down Expand Up @@ -666,7 +657,10 @@ class AutoTuner(
// specific recommendations
if (platform.recommendedClusterInfo.isDefined) {
val execCores = platform.recommendedClusterInfo.map(_.coresPerExecutor).getOrElse(1)
appendRecommendation("spark.task.resource.gpu.amount", calcTaskGPUAmount)
// Set to low value for Spark RAPIDS usage as task parallelism will be honoured
// by `spark.executor.cores`.
appendRecommendation("spark.task.resource.gpu.amount",
autoTunerConfigsProvider.DEF_TASK_GPU_RESOURCE_AMT)
appendRecommendation("spark.rapids.sql.concurrentGpuTasks",
calcGpuConcTasks().toInt)
val availableMemPerExec =
Expand Down Expand Up @@ -1211,8 +1205,12 @@ class AutoTuner(
trait AutoTunerConfigsProvider extends Logging {
// Maximum number of concurrent tasks to run on the GPU
val MAX_CONC_GPU_TASKS = 4L
// Amount of CPU memory to reserve for system overhead (kernel, buffers, etc.) in megabytes
val DEF_SYSTEM_RESERVE_MB: Long = 2 * 1024L
// Default cores per executor to be recommended for Spark RAPIDS
val DEF_CORES_PER_EXECUTOR = 16
// Default amount of a GPU memory allocated for each task.
// This is set to a low value for Spark RAPIDS as task parallelism will be
// honoured by `spark.executor.cores`.
val DEF_TASK_GPU_RESOURCE_AMT = 0.001
// Fraction of the executor JVM heap size that should be additionally reserved
// for JVM off-heap overhead (thread stacks, native libraries, etc.)
val DEF_HEAP_OVERHEAD_FRACTION = 0.1
Expand Down Expand Up @@ -1269,18 +1267,23 @@ trait AutoTunerConfigsProvider extends Logging {
"spark.rapids.memory.pinnedPool.size" ->
s"'spark.rapids.memory.pinnedPool.size' should be set to ${DEF_PINNED_MEMORY_MB}m.")

// scalastyle:off line.size.limit
val commentsForMissingProps: Map[String, String] = Map(
"spark.executor.cores" ->
// TODO: This could be extended later to be platform specific.
s"'spark.executor.cores' should be set to $DEF_CORES_PER_EXECUTOR.",
"spark.executor.instances" ->
"'spark.executor.instances' should be set to (gpuCount * numWorkers).",
"'spark.executor.instances' should be set to (cpuCoresPerNode * numWorkers) / 'spark.executor.cores'.",
"spark.task.resource.gpu.amount" ->
"'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)).",
s"'spark.task.resource.gpu.amount' should be set to $DEF_TASK_GPU_RESOURCE_AMT.",
"spark.rapids.sql.concurrentGpuTasks" ->
s"'spark.rapids.sql.concurrentGpuTasks' should be set to Min(4, (gpuMemory / 7.5G)).",
"spark.rapids.sql.enabled" ->
"'spark.rapids.sql.enabled' should be true to enable SQL operations on the GPU.",
"spark.sql.adaptive.enabled" ->
"'spark.sql.adaptive.enabled' should be enabled for better performance."
) ++ commentsForMissingMemoryProps
// scalastyle:off line.size.limit

val recommendationsTarget: Seq[String] = Seq[String](
"spark.executor.instances",
Expand Down
Loading

0 comments on commit 642e463

Please sign in to comment.