From 4971e8625ad6463130049dd054229bb8c3c85087 Mon Sep 17 00:00:00 2001 From: Pedro Duarte Date: Tue, 2 Sep 2025 21:15:19 -0400 Subject: [PATCH 1/5] Add new testcase demonstrating the problem with orphan files on a race between two file managers --- .../streaming/state/RocksDBSuite.scala | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 973c1e0cb3b0..f7290c0da923 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -631,6 +631,56 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("RocksDBFileManager: delete orphan files protect with timestamps") { + withTempDir { dir => + val dfsRootDir = dir.getAbsolutePath + val sstDir = s"$dfsRootDir/SSTs" + + // Save versions 1, 2, and 3. + // We need 3 versions otherwise or else deleteOldVersions(retain=2) will exit early. + // The first two versions must be empty so SST file timestamps do not protect + // orphan files on version 3. + val fileManagerA = new RocksDBFileManager( + dfsRootDir, Utils.createTempDir(), new Configuration) + saveCheckpointFiles(fileManagerA, Seq(), version = 1, numKeys = 101) + saveCheckpointFiles(fileManagerA, Seq(), version = 2, numKeys = 101) + saveCheckpointFiles(fileManagerA, Seq("003.sst" -> 10), version = 3, numKeys = 101) + + // Overwrite version 3 with another file manager. This creates the orphan. + val fileManagerB = new RocksDBFileManager( + dfsRootDir, Utils.createTempDir(), new Configuration) + saveCheckpointFiles(fileManagerB, Seq("003.sst" -> 10), version = 3, numKeys = 101) + + // Helper function. Counts number of copies of each SST file. + def sstCounts: Map[String, Int] = listFiles(sstDir) + .map(_.getName.split("-").head) + .groupBy(identity) + .mapValues(_.size) + + // Two copies of 003.sst exist. + assert(sstCounts === Map("003" -> 2), sstCounts) + + // Example of what we have at ths point according to fileManagerB: + // + // All modification times: + // 003-3b60ee7b-131c-4c4c-8f3d-a299b008f5b0.sst -> 1756860610932 + // 003-dfaf1b88-5676-46b9-be32-e7d353c07f97.sst -> 1756860610820 + // Tracked files: + // 003-3b60ee7b-131c-4c4c-8f3d-a299b008f5b0.sst -> Some(1756860610932) + // Oldest tracked file modification time: + // 1756860610932 + + // Deleting with fileManagerB results in 1756860610932 as the oldest tracked file modified + // time. So the other 003 copy is an orphan according to B. This is incorrect. In a race + // between fileManagers A and B it could be that 3.zip from A is last to land. If B deletes + // the orphan then the state will be corrupted. + fileManagerB.deleteOldVersions(2) + + // Passes, but behavior is incorrect. Both files should be protected to prevent corruption. + assert(sstCounts === Map("003" -> 1), sstCounts) + } + } + test("RocksDBFileManager: don't delete orphan files when there is only 1 version") { withTempDir { dir => val dfsRootDir = dir.getAbsolutePath From abef67b64f4b56be56e7e28708406aaad1b1be39 Mon Sep 17 00:00:00 2001 From: Pedro Duarte Date: Tue, 2 Sep 2025 22:01:26 -0400 Subject: [PATCH 2/5] Refactor to avoid using mutable map (++=) --- .../streaming/state/RocksDBFileManager.scala | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index b4fe3e22e888..897ae19699da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -300,6 +300,21 @@ class RocksDBFileManager( } } + /** Find files referenced in versions and store the max version each file is used in. */ + private def filesTrackedInVersions(versions: Seq[Long]): Map[String, Long] = { + val fileToMaxUsedVersion = new mutable.HashMap[String, Long] + versions.foreach { version => + val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse { + val newResolvedFiles = getImmutableFilesFromVersionZip(version) + fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles) + newResolvedFiles + } + files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) = + math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, version))) + } + + fileToMaxUsedVersion.toMap + } /** * Find orphan files which are not tracked by zip files. * Both sst files and log files can be orphan files. @@ -403,21 +418,11 @@ class RocksDBFileManager( val snapshotVersionsToDelete = sortedSnapshotVersions.filter(_ < minVersionToRetain) if (snapshotVersionsToDelete.isEmpty) return + // Tracked files with the max version in which they are used. + val trackedFiles: Map[String, Long] = filesTrackedInVersions(sortedSnapshotVersions) - // Resolve RocksDB files for all the versions and find the max version each file is used - val fileToMaxUsedVersion = new mutable.HashMap[String, Long] - sortedSnapshotVersions.foreach { version => - val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse { - val newResolvedFiles = getImmutableFilesFromVersionZip(version) - fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles) - newResolvedFiles - } - files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) = - math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, version))) - } - - // Best effort attempt to delete SST files that were last used in to-be-deleted versions - val filesToDelete = fileToMaxUsedVersion.filter { + // Files that can be deleted because they were last used in to-be-deleted versions. + val unretainedFiles = trackedFiles.filter{ case (_, v) => snapshotVersionsToDelete.contains(v) } @@ -425,8 +430,9 @@ class RocksDBFileManager( val logDir = new Path(dfsRootDir, RocksDBImmutableFile.LOG_FILES_DFS_SUBDIR) val allSstFiles = if (fm.exists(sstDir)) fm.list(sstDir).toSeq else Seq.empty val allLogFiles = if (fm.exists(logDir)) fm.list(logDir).toSeq else Seq.empty - filesToDelete ++= findOrphanFiles(fileToMaxUsedVersion.keys.toSeq, allSstFiles ++ allLogFiles) + val orphanedFiles = findOrphanFiles(fileToMaxUsedVersion.keys.toSeq, allSstFiles ++ allLogFiles) .map(_ -> -1L) + val filesToDelete = unretainedFiles ++ orphanedFiles logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain") var failedToDelete = 0 filesToDelete.foreach { case (dfsFileName, maxUsedVersion) => From b16bae0e3e4d7fc8df235cadc1378a7ee4135ca0 Mon Sep 17 00:00:00 2001 From: Pedro Duarte Date: Tue, 2 Sep 2025 22:03:33 -0400 Subject: [PATCH 3/5] Add minRetainedZipModified to findOrphanFiles --- .../streaming/state/RocksDBFileManager.scala | 38 ++++++++++++++----- .../streaming/state/RocksDBSuite.scala | 4 +- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 897ae19699da..61734f5be5ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -315,28 +315,36 @@ class RocksDBFileManager( fileToMaxUsedVersion.toMap } + /** * Find orphan files which are not tracked by zip files. + * * Both sst files and log files can be orphan files. * They are uploaded separately before the zip file of that version is uploaded. - * When the zip file of a version get overwritten, the referenced sst and log files become orphan. - * Be careful here since sst and log files of the ongoing version - * also appear to be orphan before their zip file is uploaded. + * When a version's zip file is overwritten the referenced sst and log files become orphan. + * Ensure sst and log files of any ongoing versions are not deleted. * * @param trackedFiles files tracked by metadata in versioned zip file * @param allFiles all sst or log files in the directory. + * @param minRetainedZipModified modification time of the min retained zip * @return filenames of orphan files */ - def findOrphanFiles(trackedFiles: Seq[String], allFiles: Seq[FileStatus]): Seq[String] = { + private def findOrphanFiles( + trackedFiles: Seq[String], + allFiles: Seq[FileStatus], + minRetainedZipModified: Long): Seq[String] = { val fileModificationTimes = allFiles.map(file => file.getPath.getName -> file.getModificationTime).toMap if (trackedFiles.nonEmpty && allFiles.size > trackedFiles.size) { // Some tracked files may not be in the directory when listing. val oldestTrackedFileModificationTime = trackedFiles.flatMap(fileModificationTimes.get(_)).min - // If this immutable file is older than any tracked file, - // then it can't belong to the ongoing version and it should be safe to clean it up. + + // Will not delete files modified after this time to prevent state store corruption. + val orphanModificationTimeThreshold = Seq( + minRetainedZipModified, oldestTrackedFileModificationTime).min + val orphanFiles = fileModificationTimes - .filter(_._2 < oldestTrackedFileModificationTime).keys.toSeq + .filter(_._2 < orphanModificationTimeThreshold).keys.toSeq if (orphanFiles.nonEmpty) { logInfo(s"Found ${orphanFiles.size} orphan files: ${orphanFiles.take(20).mkString(", ")}" + "... (display at most 20 filenames) that should be deleted.") @@ -426,13 +434,25 @@ class RocksDBFileManager( case (_, v) => snapshotVersionsToDelete.contains(v) } + // Modified time of the min version to retain zip file. + val minRetainedZipModified = fs.getFileStatus(dfsBatchZipFile(minVersionToRetain)) + .getModificationTime + + // All the sst and log files present in the DFS directory. val sstDir = new Path(dfsRootDir, RocksDBImmutableFile.SST_FILES_DFS_SUBDIR) val logDir = new Path(dfsRootDir, RocksDBImmutableFile.LOG_FILES_DFS_SUBDIR) val allSstFiles = if (fm.exists(sstDir)) fm.list(sstDir).toSeq else Seq.empty val allLogFiles = if (fm.exists(logDir)) fm.list(logDir).toSeq else Seq.empty - val orphanedFiles = findOrphanFiles(fileToMaxUsedVersion.keys.toSeq, allSstFiles ++ allLogFiles) - .map(_ -> -1L) + + val orphanedFiles = findOrphanFiles( + trackedFiles.keys.toSeq, + allSstFiles ++ allLogFiles, + minRetainedZipModified + ).map(_ -> -1L) + val filesToDelete = unretainedFiles ++ orphanedFiles + + // Best effort attempt to delete SST files last used in to-be-deleted versions or orphaned logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain") var failedToDelete = 0 filesToDelete.foreach { case (dfsFileName, maxUsedVersion) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index f7290c0da923..d62721b6d4ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -676,8 +676,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared // the orphan then the state will be corrupted. fileManagerB.deleteOldVersions(2) - // Passes, but behavior is incorrect. Both files should be protected to prevent corruption. - assert(sstCounts === Map("003" -> 1), sstCounts) + // Both files are protected to prevent corruption. + assert(sstCounts === Map("003" -> 2), sstCounts) } } From 564abd67faebb31787f4515edba1b578e4f701af Mon Sep 17 00:00:00 2001 From: Pedro Duarte Date: Tue, 2 Sep 2025 22:11:15 -0400 Subject: [PATCH 4/5] Undo unnecessary changes --- .../sql/execution/streaming/state/RocksDBFileManager.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 61734f5be5ab..5e7ec0032149 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -318,10 +318,9 @@ class RocksDBFileManager( /** * Find orphan files which are not tracked by zip files. - * * Both sst files and log files can be orphan files. * They are uploaded separately before the zip file of that version is uploaded. - * When a version's zip file is overwritten the referenced sst and log files become orphan. + * When the zip file of a version get overwritten, the referenced sst and log files become orphan. * Ensure sst and log files of any ongoing versions are not deleted. * * @param trackedFiles files tracked by metadata in versioned zip file From 808bfd1681e1c736b5bc5517088c66816e8f576b Mon Sep 17 00:00:00 2001 From: Pedro Duarte Date: Wed, 3 Sep 2025 08:41:13 -0400 Subject: [PATCH 5/5] Fix 2.13 compatibility --- .../spark/sql/execution/streaming/state/RocksDBSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index d62721b6d4ee..77e6a979bb3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -655,7 +655,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared def sstCounts: Map[String, Int] = listFiles(sstDir) .map(_.getName.split("-").head) .groupBy(identity) - .mapValues(_.size) + .map { case (k, v) => k -> v.size } // Two copies of 003.sst exist. assert(sstCounts === Map("003" -> 2), sstCounts)