Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully shutdown workers on timeout or high mem threshold #151

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/ReTestItems.jl
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,7 @@ function manage_worker(
ch = Channel{TestItemResult}(1)
if memory_percent() > memory_threshold_percent
@warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting worker process to try to free memory."
terminate!(worker)
wait(worker)
close(worker, :high_memory)
worker = robust_start_worker(proj_name, nworker_threads, worker_init_expr, ntestitems)
end
testitem.workerid[] = worker.pid
Expand Down Expand Up @@ -554,8 +553,7 @@ function manage_worker(
# Handle the exception
if e isa TimeoutException
@debugv 1 "Test item $(repr(testitem.name)) timed out. Terminating worker $worker"
terminate!(worker)
wait(worker)
close(worker, :timeout)
@error "$worker timed out running test item $(repr(testitem.name)) after $timeout seconds. \
Recording test error."
record_timeout!(testitem, run_number, timeout)
Expand Down
11 changes: 6 additions & 5 deletions src/workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ function terminate!(w::Worker, from::Symbol=:manual)
empty!(w.futures)
end
signal = Base.SIGTERM
while true
while !process_exited(w.process)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that we never swapped this package over to use ConcurrentUtilities workers. We should probably do that and maintain that code in one place.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sure would be nice to maintain this in only one place... on the other hand i don't think it's ideal for a test framework to have dependencies (since then you can't use it to the test code that requires a different version of that same dependency), so i think if we were to maintain it in one place (rather than maintain a duplicate codebase here, that will slightly diverge over time) then we'd want ReTestItems to vendor it some other way

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(for vendoring it in some other way, we could include the other repo as a git submodule, or have the build script checkout the repo from a committed tag or something like that.)
but for now i agree it makes sense to keep it duplicated until we invest in a cleanup like that.

@debug "sending signal $signal to worker $(w.pid)"
kill(w.process, signal)
signal = signal == Base.SIGTERM ? Base.SIGINT : Base.SIGKILL
process_exited(w.process) && break
sleep(0.1)
process_exited(w.process) && break
end
if !(w.socket.status == Base.StatusUninit || w.socket.status == Base.StatusInit || w.socket.handle === C_NULL)
close(w.socket)
Expand All @@ -107,8 +107,9 @@ end

# gracefully terminate a worker by sending a shutdown message
# and waiting for the other tasks to perform worker shutdown
function Base.close(w::Worker)
function Base.close(w::Worker, from::Symbol=:manual)
if !w.terminated && isopen(w.socket)
@debug "closing worker $(w.pid) from $from"
req = Request(Symbol(), :(), rand(UInt64), true)
@lock w.lock begin
serialize(w.socket, req)
Expand Down Expand Up @@ -211,7 +212,7 @@ function redirect_worker_output(io::IO, w::Worker, fn, proc, ev::Threads.Event)
end
end
catch e
# @error "Error redirecting worker output $(w.pid)" exception=(e, catch_backtrace())
@debug "Error redirecting worker output $(w.pid)" exception=(e, catch_backtrace())
terminate!(w, :redirect_worker_output)
e isa EOFError || e isa Base.IOError || rethrow()
finally
Expand Down Expand Up @@ -246,7 +247,7 @@ function process_responses(w::Worker, ev::Threads.Event)
end
end
catch e
# @error "Error processing responses from worker $(w.pid)" exception=(e, catch_backtrace())
@debug "Error processing responses from worker $(w.pid)" exception=(e, catch_backtrace())
terminate!(w, :process_responses)
e isa EOFError || e isa Base.IOError || rethrow()
end
Expand Down
2 changes: 2 additions & 0 deletions test/workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ using Test
@testset "clean shutdown ($w)" begin
close(w)
@test !process_running(w.process)
@test w.process.termsignal == Base.SIGTERM
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering where the "SIGTERM" comes from? Your PR description says:

terminate workers with SIGTERM

but i think if it's a graceful close, the process just exits, meaning there is no signal at all.
So maybe this should be:

Suggested change
@test w.process.termsignal == Base.SIGTERM
@test w.process.termsignal == 0

?

That's what i'm seeing when running the tests:

  Expression: w.process.termsignal == Base.SIGTERM
   Evaluated: 0 == 15

@test w.process.exitcode == 0
@test !isopen(w.socket)
@test w.terminated
@test istaskstarted(w.messages) && istaskdone(w.messages)
Expand Down
Loading