Skip to content

Commit

Permalink
KAFKA-18597: Fix test issue
Browse files Browse the repository at this point in the history
  • Loading branch information
LoganZhuZzz committed Jan 20, 2025
1 parent fc7aa2d commit a399709
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
7 changes: 3 additions & 4 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,11 @@ class LogCleaner(initialConfig: CleanerConfig,
private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]()

/**
* scala 2.12 does not support maxOption so we handle the empty manually.
* @param f to compute the result
* @return the max value (int value) or 0 if there is no cleaner
* @return the max value or 0 if there is no cleaner
*/
private def maxOverCleanerThreads(f: CleanerThread => Double): Double =
cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread)))
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Double =
cleaners.map(f).maxOption.getOrElse(0.0d)

/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
Expand Down
16 changes: 5 additions & 11 deletions core/src/main/scala/kafka/log/LogCleanerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -532,21 +532,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
def maintainUncleanablePartitions(): Unit = {
// Remove deleted partitions from uncleanablePartitions
inLock(lock) {
// Note: we don't use retain or filterInPlace method in this function because retain is deprecated in
// scala 2.13 while filterInPlace is not available in scala 2.12.

// Remove deleted partitions
uncleanablePartitions.values.foreach {
partitions =>
val partitionsToRemove = partitions.filterNot(logs.contains).toList
partitionsToRemove.foreach { partitions.remove }
uncleanablePartitions.values.foreach { partitions =>
partitions.filterInPlace(logs.contains)
}

// Remove entries with empty partition set.
val logDirsToRemove = uncleanablePartitions.filter {
case (_, partitions) => partitions.isEmpty
}.keys.toList
logDirsToRemove.foreach { uncleanablePartitions.remove }
uncleanablePartitions.filterInPlace {
case (_, partitions) => partitions.nonEmpty
}
}
}

Expand Down
28 changes: 28 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,34 @@ class LogCleanerTest extends Logging {
}
}

@Test
def testMaxOverCleanerThreads(): Unit = {
val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)

val cleaners = logCleaner.cleaners

val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.bufferUtilization = 0.75
cleaners += cleaner1

val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.bufferUtilization = 0.85
cleaners += cleaner2

val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.bufferUtilization = 0.65
cleaners += cleaner3

assertEquals(0.85, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
}

private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for (((key, value), offset) <- keysAndValues.zip(offsetSeq))
yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset
Expand Down

0 comments on commit a399709

Please sign in to comment.