Skip to content

Commit

Permalink
Link supervisor with forks when registering processes
Browse files Browse the repository at this point in the history
And use this to clean things up (deregister processes and release all
claimed executions) in cases where the forks (workers, scheduler) don't
have a chance to terminate in an orderly way.
  • Loading branch information
rosa committed Nov 2, 2023
1 parent 0305dd1 commit ca24864
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 22 deletions.
14 changes: 6 additions & 8 deletions app/models/solid_queue/process.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
class SolidQueue::Process < SolidQueue::Record
include Prunable

if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1")
serialize :metadata, coder: JSON
else
serialize :metadata, JSON
end

belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :forks
has_many :forks, class_name: "SolidQueue::Process", inverse_of: :supervisor, dependent: :destroy
has_many :claimed_executions

store :metadata, accessors: [ :kind, :pid ], coder: JSON

after_destroy -> { claimed_executions.release_all }

def self.register(metadata)
create!(metadata: metadata, last_heartbeat_at: Time.current)
def self.register(**attributes)
create!(attributes.merge(last_heartbeat_at: Time.current))
end

def heartbeat
Expand Down
10 changes: 7 additions & 3 deletions lib/solid_queue/process_registration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@ module ProcessRegistration
set_callback :shutdown, :before, :stop_heartbeat
set_callback :shutdown, :after, :deregister

attr_accessor :supervisor_pid
attr_reader :supervisor
end

def inspect
metadata.inspect
end
alias to_s inspect

def supervised_by(process)
@supervisor = process
end

private
attr_accessor :process

def register
@process = SolidQueue::Process.register(metadata)
@process = SolidQueue::Process.register(supervisor: supervisor, metadata: metadata)
end

def deregister
Expand Down Expand Up @@ -61,7 +65,7 @@ def process_pid
end

def metadata
{ kind: self.class.name.demodulize, hostname: hostname, pid: process_pid, supervisor_pid: supervisor_pid }
{ kind: self.class.name.demodulize, hostname: hostname, pid: process_pid, supervisor_pid: supervisor&.pid }
end
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def finished?
end

def supervisor_went_away?
supervised? && supervisor_pid != ::Process.ppid
supervised? && supervisor&.pid != ::Process.ppid
end

def supervised?
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def prune_dead_processes
end

def start_runner(runner)
runner.supervisor_pid = ::Process.pid
runner.supervised_by process

pid = fork do
runner.start
Expand Down
4 changes: 3 additions & 1 deletion test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2023_10_30_164933) do
ActiveRecord::Schema[7.1].define(version: 2023_11_02_184135) do
create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "queue_name"
t.string "status"
Expand Down Expand Up @@ -59,7 +59,9 @@
t.text "metadata"
t.datetime "created_at", null: false
t.datetime "last_heartbeat_at", null: false
t.bigint "supervisor_id"
t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at"
t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id"
end

create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
Expand Down
14 changes: 7 additions & 7 deletions test/integration/processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
assert_completed_job_results("no pause")
assert_job_status(no_pause, :finished)

# This job was left claimed as the worker was shutdown without
# a chance to terminate orderly
assert_started_job_result("pause")
assert_job_status(pause, :claimed)

# Workers were shutdown without a chance to terminate orderly, but
# since they're linked to the supervisor, the supervisor deregistering
# also deregistered them and released claimed jobs
# Processes didn't have a chance to deregister either
assert_registered_workers_for(:background, :default)
assert_clean_termination
end

test "term supervisor while there are jobs in-flight" do
Expand Down Expand Up @@ -179,8 +178,8 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
assert process_exists?(@pid)

terminate_supervisor
# TODO: change this to clean termination when replacing a worker also deregisters its process ID
assert_registered_workers_for(:background)

assert_clean_termination
end

private
Expand All @@ -197,6 +196,7 @@ def terminate_registered_processes
end

def assert_clean_termination
wait_for_registered_processes 0, timeout: 0.2.second
assert_no_registered_processes
assert_no_claimed_jobs
end
Expand Down
2 changes: 1 addition & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def run_supervisor_as_fork(**options)

def wait_for_registered_processes(count, timeout: 1.second)
Timeout.timeout(timeout) do
while SolidQueue::Process.count < count do
while SolidQueue::Process.count != count do
sleep 0.05
end
end
Expand Down

0 comments on commit ca24864

Please sign in to comment.