-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-50708][CONNECT] Delete Artifact resources on GC of ArtifactManager
instance
#49341
base: master
Are you sure you want to change the base?
Conversation
@xupefei PTAL! |
|
||
// The Cleaner and the associated cleanup task | ||
private val cleaner: Cleaner = Cleaner.create() | ||
private val cleanable = cleaner.register(this, new Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be an issue... The runnable you create here points to the class you are trying to clean-up. This will keep it alive. AFAIK we will have to separate the state from the ArtifactManager to make this work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah oof, good point...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell What about the following approach:
- Create a new method in the companion object
def cleanupFiles(artifactPath: File) = {...}
- The cleanable will now be defined as :
private val cleanable = cleaner.register(this, new Runnable {
override def run(): Unit = ArtifactManager.cleanupFiles(artifactPath)
})
Then, files will at least be deleted and we could depend on GC to eventually clean out the internal buffers (jarsList
, pythonIncludeList
etc).
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that should be the only way. One suggestion might be explicitly scope to clean shared resources (e.g., SparkContext, BlockManager, etc.). e.g.,
@@ -377,30 +376,7 @@ class ArtifactManager(session: SparkSession) extends Logging {
* Cleans up all resources specific to this `session`.
*/
private[sql] def cleanUpResources(): Unit = {
- logDebug(
- s"Cleaning up resources for session with sessionUUID ${session.sessionUUID}")
-
- // Clean up added files
- val fileserver = SparkEnv.get.rpcEnv.fileServer
- val sparkContext = session.sparkContext
- if (state != null) {
- val shouldUpdateEnv = sparkContext.addedFiles.contains(state.uuid) ||
- sparkContext.addedArchives.contains(state.uuid) ||
- sparkContext.addedJars.contains(state.uuid)
- if (shouldUpdateEnv) {
- sparkContext.addedFiles.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
- sparkContext.addedArchives.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
- sparkContext.addedJars.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeJar))
- sparkContext.postEnvironmentUpdate()
- }
- }
-
- // Clean up cached relations
- val blockManager = sparkContext.env.blockManager
- blockManager.removeCache(session.sessionUUID)
-
- // Clean up artifacts folder
- FileUtils.deleteDirectory(artifactPath.toFile)
+ cleanUpSharedResources(session, state, artifactPath)
// Clean up internal trackers
jarsList.clear()
@@ -447,6 +423,10 @@ class ArtifactManager(session: SparkSession) extends Logging {
}
fs.copyFromLocalFile(false, true, new FSPath(localPath.toString), destFSPath)
}
+
+ private val cleaner: Cleaner = Cleaner.create()
+ private val s = (session, state, artifactPath)
+ cleaner.register(s, () => (cleanUpSharedResources _).tupled(s))
}
object ArtifactManager extends Logging {
@@ -481,4 +461,35 @@ object ArtifactManager extends Logging {
throw SparkException.internalError(s"Block $fromId not found in the block manager.")
}
}
+
+ /**
+ * Cleans up all resources specific to this `session`.
+ */
+ private def cleanUpSharedResources(
+ session: SparkSession, state: JobArtifactState, artifactPath: Path): Unit = {
+ logDebug(
+ s"Cleaning up resources for session with sessionUUID ${session.sessionUUID}")
+
+ // Clean up added files
+ val fileserver = SparkEnv.get.rpcEnv.fileServer
+ val sparkContext = session.sparkContext
+ if (state != null) {
+ val shouldUpdateEnv = sparkContext.addedFiles.contains(state.uuid) ||
+ sparkContext.addedArchives.contains(state.uuid) ||
+ sparkContext.addedJars.contains(state.uuid)
+ if (shouldUpdateEnv) {
+ sparkContext.addedFiles.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
+ sparkContext.addedArchives.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
+ sparkContext.addedJars.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeJar))
+ sparkContext.postEnvironmentUpdate()
+ }
+ }
+
+ // Clean up cached relations
+ val blockManager = sparkContext.env.blockManager
+ blockManager.removeCache(session.sessionUUID)
+
+ // Clean up artifacts folder
+ FileUtils.deleteDirectory(artifactPath.toFile)
+ }
}
If we want it to be absolutely sure, would be good to just follow the example written in https://docs.oracle.com/en/java/javase/11/docs//api/java.base/java/lang/ref/Cleaner.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or cleanUpGlobalResources
? I am fine with your existing approach too. just thinking loud
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestions @HyukjinKwon !
I've updated it based on the API example + your suggestions, PTAL again :)
try { | ||
FileUtils.deleteDirectory(artifactPath.toFile) | ||
} catch { | ||
case e: Exception => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catch NonFatal
? Or can we catch something more IO related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed!
ArtifactManager
instanceArtifactManager
instance
let's fix up the linter failure tho. |
What changes were proposed in this pull request?
Registers a
java.lang.ref.Cleaner
forArtifactManager
.ArtifactManager#cleanUpResources
is further made thread-safe and swallows FileSystem exceptions in order to let the remaining cleanup go through.Why are the changes needed?
Currently, an instance of ArtifactManager's resources isn't cleaned up automatically during its GC. Previously, Artifact Manager was only used in Spark Connect, where its lifetime was controlled by the Spark Connect Session which would manually call
ArtifactManager#cleanUpResources
when it itself is closed.With the recent changes allowing the artifact manager to be used in 'Classic Spark', we should GC related resources when the SparkSession gets GCed (and thus, the ArtifactManager instance is no longer reachable)
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests.
Was this patch authored or co-authored using generative AI tooling?
No.