From 9246dd424b098f8b657fd4deaffa42e4a12c975d Mon Sep 17 00:00:00 2001 From: Nathan Daly Date: Thu, 18 Apr 2024 19:31:47 -0600 Subject: [PATCH 1/2] Attempt to fix the graceful close mechanism --- src/workers.jl | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/workers.jl b/src/workers.jl index 2b116514..5626bc46 100644 --- a/src/workers.jl +++ b/src/workers.jl @@ -59,6 +59,7 @@ mutable struct Worker process_watch::Task futures::Dict{UInt64, Future} # Request.id -> Future @atomic terminated::Bool + gracefully_closed::Bool end # used to close Future.value channels when a worker terminates @@ -81,6 +82,16 @@ function terminate!(w::Worker, from::Symbol=:manual) end empty!(w.futures) end + # Give some time for the worker to exit gracefully. + if w.gracefully_closed + @static if !Sys.isapple() + # NOTE: THIS CAUSES A HANG ON MACOS! We can remove this if-check once + # the julia issue is resolved: + sleep(5) + else + wait(w.process) + end + end signal = Base.SIGTERM while !process_exited(w.process) @debug "sending signal $signal to worker $(w.pid)" @@ -116,6 +127,7 @@ function Base.close(w::Worker, from::Symbol=:manual) flush(w.socket) end end + w.gracefully_closed = true wait(w) return end @@ -173,7 +185,7 @@ function Worker(; return Sockets.connect(parse(Int, split(port_str, ':')[2])) end # create worker - w = Worker(ReentrantLock(), pid, proc, sock, Task(nothing), Task(nothing), Task(nothing), Dict{UInt64, Future}(), false) + w = Worker(ReentrantLock(), pid, proc, sock, Task(nothing), Task(nothing), Task(nothing), Dict{UInt64, Future}(), false, false) ## start a task to watch for worker process termination, notify the event when the task starts e1 = Threads.Event() w.process_watch = Threads.@spawn watch_and_terminate!(w, $e1) From f6d0566e41def8bc7823fed35f92b1856dd7b042 Mon Sep 17 00:00:00 2001 From: Nathan Daly Date: Thu, 18 Apr 2024 19:32:19 -0600 Subject: [PATCH 2/2] Fix test --- test/workers.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/workers.jl b/test/workers.jl index 35dac221..aa2f46e0 100644 --- a/test/workers.jl +++ b/test/workers.jl @@ -17,7 +17,7 @@ using Test @testset "clean shutdown ($w)" begin close(w) @test !process_running(w.process) - @test w.process.termsignal == Base.SIGTERM + @test w.process.termsignal == 0 @test w.process.exitcode == 0 @test !isopen(w.socket) @test w.terminated @@ -41,6 +41,7 @@ using Test w = Worker() @testset "remote_eval/remote_fetch ($w)" begin + @info "starting testset remote_eval/remote_fetch ($w)" expr = quote global x x = 101