Skip to content

Commit

Permalink
Implement new inline mode to ease tests for Mission Control
Browse files Browse the repository at this point in the history
In this mode, the worker/scheduler runs inline (still with the thread pool)
until there's no more work, and then it stops.
  • Loading branch information
rosa committed Sep 15, 2023
1 parent 0c4b1da commit 2b9057d
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 35 deletions.
11 changes: 5 additions & 6 deletions app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
module SolidQueue
class ReadyExecution < Execution
scope :queued_as, ->(queues) { where(queue_name: queues) }
scope :ordered, -> { order(priority: :asc) }

before_create :assume_attributes_from_job
Expand All @@ -19,9 +18,13 @@ def claim(queues, limit)
claimed_executions_for(candidate_job_ids)
end

def queued_as(queues)
QueueParser.new(queues, self).scoped_relation
end

private
def query_candidates(queues, limit)
queue_scope(queues).ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id)
queued_as(queues).ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id)
end

def lock(job_ids)
Expand All @@ -35,10 +38,6 @@ def claimed_executions_for(job_ids)

SolidQueue::ClaimedExecution.where(job_id: job_ids)
end

def queue_scope(queues)
QueueParser.new(queues, self).scoped_relation
end
end

def claim
Expand Down
48 changes: 31 additions & 17 deletions lib/solid_queue/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,8 @@ module Runner
attr_accessor :supervisor_pid
end

def start(mode: :sync)
procline "starting in mode #{mode}"

@stopping = false
register_signal_handlers

SolidQueue.logger.info("[SolidQueue] Starting #{self}")
def start(mode: :supervised)
boot_in mode

run_callbacks(:start) do
if mode == :async
Expand All @@ -33,14 +28,27 @@ def start(mode: :sync)

def stop
@stopping = true
@thread.join if running_in_async_mode?
@thread&.join
end

def running?
!stopping?
end

private
attr_reader :mode

def boot_in(mode)
@mode = mode.to_s.inquiry
@stopping = false

procline "starting in mode #{mode}"

register_signal_handlers

SolidQueue.logger.info("[SolidQueue] Starting #{self}")
end

def register_signal_handlers
%w[ INT TERM ].each do |signal|
trap(signal) do
Expand All @@ -67,10 +75,11 @@ def start_loop
end

def shutting_down?
stopping? || supervisor_went_away?
stopping? || supervisor_went_away? || finished?
end

def run
raise NotImplementedError
end

def shutdown
Expand All @@ -81,19 +90,24 @@ def stopping?
@stopping
end

def shutdown_completed?
def finished?
running_inline? && all_work_completed?
end

def supervisor_went_away?
if running_in_async_mode?
false
else
supervisor_pid != ::Process.ppid
end
supervised? && supervisor_pid != ::Process.ppid
end

def supervised?
mode.supervised?
end

def all_work_completed?
false
end

def running_in_async_mode?
@thread.present?
def running_inline?
mode.inline?
end

def hostname
Expand Down
10 changes: 0 additions & 10 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,6 @@ def run
end
end

def shutdown
super

@shutdown_completed = true
end

def shutdown_completed?
@shutdown_completed
end

def metadata
super.merge(batch_size: batch_size, polling_interval: polling_interval)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def shutdown
pool.wait_for_termination(SolidQueue.shutdown_timeout)
end

def shutdown_completed?
pool.shutdown?
def all_work_completed?
SolidQueue::ReadyExecution.queued_as(queues).empty?
end

def metadata
Expand Down
4 changes: 4 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
end

class ActiveSupport::TestCase
setup do
SolidQueue.logger = ActiveSupport::Logger.new(nil)
end

teardown do
JobBuffer.clear
File.delete(SolidQueue.supervisor_pidfile) if File.exist?(SolidQueue.supervisor_pidfile)
Expand Down

0 comments on commit 2b9057d

Please sign in to comment.