Skip to content

Commit

Permalink
Add shuffle partition conf to limited logic recommendation (#1479)
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Dec 31, 2024
1 parent 5380342 commit 891b3b5
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class AutoTuner(
// Note that the recommendations will be computed anyway to avoid breaking dependencies.
private val skippedRecommendations: mutable.HashSet[String] = mutable.HashSet[String]()
// list of recommendations having the calculations disabled, and only depend on default values
private val limitedLogicRecommendations: mutable.HashSet[String] = mutable.HashSet[String]()
protected val limitedLogicRecommendations: mutable.HashSet[String] = mutable.HashSet[String]()
// When enabled, the profiler recommendations should only include updated settings.
private var filterByUpdatedPropertiesEnabled: Boolean = true

Expand Down Expand Up @@ -1032,10 +1032,10 @@ class AutoTuner(
val lookup = "spark.sql.shuffle.partitions"
var shufflePartitions =
getPropertyValue(lookup).getOrElse(autoTunerConfigsProvider.DEF_SHUFFLE_PARTITIONS).toInt
val shuffleStagesWithPosSpilling = appInfoProvider.getShuffleStagesWithPosSpilling

// TODO: Need to look at other metrics for GPU spills (DEBUG mode), and batch sizes metric
if (isCalculationEnabled(lookup)) {
val shuffleStagesWithPosSpilling = appInfoProvider.getShuffleStagesWithPosSpilling
if (shuffleStagesWithPosSpilling.nonEmpty) {
val shuffleSkewStages = appInfoProvider.getShuffleSkewStages
if (shuffleSkewStages.exists(id => shuffleStagesWithPosSpilling.contains(id))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.nvidia.spark.rapids.tool.tuning

import scala.collection.mutable

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, Platform}
import com.nvidia.spark.rapids.tool.profiling.DriverLogInfoProvider

Expand All @@ -29,7 +31,16 @@ class QualificationAutoTuner(
platform: Platform,
driverInfoProvider: DriverLogInfoProvider)
extends AutoTuner(clusterProps, appInfoProvider, platform, driverInfoProvider,
QualificationAutoTunerConfigsProvider)
QualificationAutoTunerConfigsProvider) {

/**
* List of recommendations for which the Qualification AutoTuner skips calculations and only
* depend on default values.
*/
override protected val limitedLogicRecommendations: mutable.HashSet[String] = mutable.HashSet(
"spark.sql.shuffle.partitions"
)
}

/**
* Provides configuration settings for the Qualification Tool's AutoTuner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ import com.nvidia.spark.rapids.tool.profiling.Profiler
*/
class QualificationAutoTunerSuite extends BaseAutoTunerSuite {

/**
* Default Spark properties to be used when building the Qualification AutoTuner
*/
private val defaultSparkProps: mutable.Map[String, String] = {
mutable.LinkedHashMap[String, String](
"spark.executor.cores" -> "32",
"spark.executor.instances" -> "1",
"spark.executor.memory" -> "80g",
"spark.executor.instances" -> "1"
)
}

/**
* Helper method to build a worker info string with CPU properties
*/
Expand All @@ -37,24 +49,24 @@ class QualificationAutoTunerSuite extends BaseAutoTunerSuite {
buildWorkerInfoAsString(customProps, numCores, systemMemory, numWorkers)
}

test("test AutoTuner for Qualification sets batch size to 1GB") {
// mock the properties loaded from eventLog
val logEventsProps: mutable.Map[String, String] =
mutable.LinkedHashMap[String, String](
"spark.executor.cores" -> "32",
"spark.executor.instances" -> "1",
"spark.executor.memory" -> "80g",
"spark.executor.instances" -> "1"
)
/**
* Helper method to return an instance of the Qualification AutoTuner with default properties
*/
private def buildDefaultAutoTuner(
logEventsProps: mutable.Map[String, String] = defaultSparkProps): AutoTuner = {
val workerInfo = buildCpuWorkerInfoAsString(None, Some(32),
Some("212992MiB"), Some(5))
val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0),
logEventsProps, Some(defaultSparkVersion))
val clusterPropsOpt = QualificationAutoTunerConfigsProvider
.loadClusterPropertiesFromContent(workerInfo)
val platform = PlatformFactory.createInstance(PlatformNames.EMR, clusterPropsOpt)
val autoTuner = QualificationAutoTunerConfigsProvider.buildAutoTunerFromProps(
QualificationAutoTunerConfigsProvider.buildAutoTunerFromProps(
workerInfo, infoProvider, platform)
}

test("test AutoTuner for Qualification sets batch size to 1GB") {
val autoTuner = buildDefaultAutoTuner()
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
val expectedResults = Seq(
Expand All @@ -63,4 +75,15 @@ class QualificationAutoTunerSuite extends BaseAutoTunerSuite {
)
assert(expectedResults.forall(autoTunerOutput.contains))
}

test("test AutoTuner for Qualification sets shuffle partitions to 200") {
val autoTuner = buildDefaultAutoTuner()
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
val expectedResults = Seq(
"--conf spark.sql.shuffle.partitions=200",
"- 'spark.sql.shuffle.partitions' was not set."
)
assert(expectedResults.forall(autoTunerOutput.contains))
}
}

0 comments on commit 891b3b5

Please sign in to comment.