Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,28 +300,50 @@ class RocksDBFileManager(
}
}

/** Find files referenced in versions and store the max version each file is used in. */
private def filesTrackedInVersions(versions: Seq[Long]): Map[String, Long] = {
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
versions.foreach { version =>
val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse {
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles)
newResolvedFiles
}
files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, version)))
}

fileToMaxUsedVersion.toMap
}

/**
* Find orphan files which are not tracked by zip files.
* Both sst files and log files can be orphan files.
* They are uploaded separately before the zip file of that version is uploaded.
* When the zip file of a version get overwritten, the referenced sst and log files become orphan.
* Be careful here since sst and log files of the ongoing version
* also appear to be orphan before their zip file is uploaded.
* Ensure sst and log files of any ongoing versions are not deleted.
*
* @param trackedFiles files tracked by metadata in versioned zip file
* @param allFiles all sst or log files in the directory.
* @param minRetainedZipModified modification time of the min retained zip
* @return filenames of orphan files
*/
def findOrphanFiles(trackedFiles: Seq[String], allFiles: Seq[FileStatus]): Seq[String] = {
private def findOrphanFiles(
trackedFiles: Seq[String],
allFiles: Seq[FileStatus],
minRetainedZipModified: Long): Seq[String] = {
val fileModificationTimes = allFiles.map(file =>
file.getPath.getName -> file.getModificationTime).toMap
if (trackedFiles.nonEmpty && allFiles.size > trackedFiles.size) {
// Some tracked files may not be in the directory when listing.
val oldestTrackedFileModificationTime = trackedFiles.flatMap(fileModificationTimes.get(_)).min
// If this immutable file is older than any tracked file,
// then it can't belong to the ongoing version and it should be safe to clean it up.

// Will not delete files modified after this time to prevent state store corruption.
val orphanModificationTimeThreshold = Seq(
minRetainedZipModified, oldestTrackedFileModificationTime).min

val orphanFiles = fileModificationTimes
.filter(_._2 < oldestTrackedFileModificationTime).keys.toSeq
.filter(_._2 < orphanModificationTimeThreshold).keys.toSeq
if (orphanFiles.nonEmpty) {
logInfo(s"Found ${orphanFiles.size} orphan files: ${orphanFiles.take(20).mkString(", ")}" +
"... (display at most 20 filenames) that should be deleted.")
Expand Down Expand Up @@ -403,30 +425,33 @@ class RocksDBFileManager(
val snapshotVersionsToDelete = sortedSnapshotVersions.filter(_ < minVersionToRetain)
if (snapshotVersionsToDelete.isEmpty) return

// Tracked files with the max version in which they are used.
val trackedFiles: Map[String, Long] = filesTrackedInVersions(sortedSnapshotVersions)

// Resolve RocksDB files for all the versions and find the max version each file is used
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
sortedSnapshotVersions.foreach { version =>
val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse {
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles)
newResolvedFiles
}
files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, version)))
}

// Best effort attempt to delete SST files that were last used in to-be-deleted versions
val filesToDelete = fileToMaxUsedVersion.filter {
// Files that can be deleted because they were last used in to-be-deleted versions.
val unretainedFiles = trackedFiles.filter{
case (_, v) => snapshotVersionsToDelete.contains(v)
}

// Modified time of the min version to retain zip file.
val minRetainedZipModified = fs.getFileStatus(dfsBatchZipFile(minVersionToRetain))
.getModificationTime

// All the sst and log files present in the DFS directory.
val sstDir = new Path(dfsRootDir, RocksDBImmutableFile.SST_FILES_DFS_SUBDIR)
val logDir = new Path(dfsRootDir, RocksDBImmutableFile.LOG_FILES_DFS_SUBDIR)
val allSstFiles = if (fm.exists(sstDir)) fm.list(sstDir).toSeq else Seq.empty
val allLogFiles = if (fm.exists(logDir)) fm.list(logDir).toSeq else Seq.empty
filesToDelete ++= findOrphanFiles(fileToMaxUsedVersion.keys.toSeq, allSstFiles ++ allLogFiles)
.map(_ -> -1L)

val orphanedFiles = findOrphanFiles(
trackedFiles.keys.toSeq,
allSstFiles ++ allLogFiles,
minRetainedZipModified
).map(_ -> -1L)

val filesToDelete = unretainedFiles ++ orphanedFiles

// Best effort attempt to delete SST files last used in to-be-deleted versions or orphaned
logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain")
var failedToDelete = 0
filesToDelete.foreach { case (dfsFileName, maxUsedVersion) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,56 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}

test("RocksDBFileManager: delete orphan files protect with timestamps") {
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
val sstDir = s"$dfsRootDir/SSTs"

// Save versions 1, 2, and 3.
// We need 3 versions otherwise or else deleteOldVersions(retain=2) will exit early.
// The first two versions must be empty so SST file timestamps do not protect
// orphan files on version 3.
val fileManagerA = new RocksDBFileManager(
dfsRootDir, Utils.createTempDir(), new Configuration)
saveCheckpointFiles(fileManagerA, Seq(), version = 1, numKeys = 101)
saveCheckpointFiles(fileManagerA, Seq(), version = 2, numKeys = 101)
saveCheckpointFiles(fileManagerA, Seq("003.sst" -> 10), version = 3, numKeys = 101)

// Overwrite version 3 with another file manager. This creates the orphan.
val fileManagerB = new RocksDBFileManager(
dfsRootDir, Utils.createTempDir(), new Configuration)
saveCheckpointFiles(fileManagerB, Seq("003.sst" -> 10), version = 3, numKeys = 101)

// Helper function. Counts number of copies of each SST file.
def sstCounts: Map[String, Int] = listFiles(sstDir)
.map(_.getName.split("-").head)
.groupBy(identity)
.map { case (k, v) => k -> v.size }

// Two copies of 003.sst exist.
assert(sstCounts === Map("003" -> 2), sstCounts)

// Example of what we have at ths point according to fileManagerB:
//
// All modification times:
// 003-3b60ee7b-131c-4c4c-8f3d-a299b008f5b0.sst -> 1756860610932
// 003-dfaf1b88-5676-46b9-be32-e7d353c07f97.sst -> 1756860610820
// Tracked files:
// 003-3b60ee7b-131c-4c4c-8f3d-a299b008f5b0.sst -> Some(1756860610932)
// Oldest tracked file modification time:
// 1756860610932

// Deleting with fileManagerB results in 1756860610932 as the oldest tracked file modified
// time. So the other 003 copy is an orphan according to B. This is incorrect. In a race
// between fileManagers A and B it could be that 3.zip from A is last to land. If B deletes
// the orphan then the state will be corrupted.
fileManagerB.deleteOldVersions(2)

// Both files are protected to prevent corruption.
assert(sstCounts === Map("003" -> 2), sstCounts)
}
}

test("RocksDBFileManager: don't delete orphan files when there is only 1 version") {
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
Expand Down