diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index 41c74694..994be6aa 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -2,7 +2,7 @@ module SolidQueue class Dispatcher < Processes::Base - include Processes::Runnable, Processes::Poller + include Processes::Poller attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule @@ -20,13 +20,9 @@ def initialize(**options) end private - def run + def poll batch = dispatch_next_batch - - unless batch.size > 0 - procline "waiting" - interruptible_sleep(polling_interval) - end + batch.size end def dispatch_next_batch @@ -51,6 +47,10 @@ def unload_recurring_schedule recurring_schedule.unload_tasks end + def set_procline + procline "waiting" + end + def metadata super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence ) end diff --git a/lib/solid_queue/processes/poller.rb b/lib/solid_queue/processes/poller.rb index aa772807..d27274ef 100644 --- a/lib/solid_queue/processes/poller.rb +++ b/lib/solid_queue/processes/poller.rb @@ -4,11 +4,39 @@ module SolidQueue::Processes module Poller extend ActiveSupport::Concern + include Runnable + included do attr_accessor :polling_interval end private + def run + if mode.async? + @thread = Thread.new { start_loop } + else + start_loop + end + end + + def start_loop + loop do + break if shutting_down? + + wrap_in_app_executor do + unless poll > 0 + interruptible_sleep(polling_interval) + end + end + end + ensure + run_callbacks(:shutdown) { shutdown } + end + + def poll + raise NotImplementedError + end + def with_polling_volume if SolidQueue.silence_polling? ActiveRecord::Base.logger.silence { yield } diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index 71b9ebf9..da411d46 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -8,9 +8,9 @@ module Runnable def start @stopping = false - run_callbacks(:boot) { boot } - start_loop + + run end def stop @@ -26,28 +26,12 @@ def mode end def boot - register_signal_handlers if supervised? - SolidQueue.logger.info("[SolidQueue] Starting #{self}") - end - - def start_loop - if mode.async? - @thread = Thread.new { do_start_loop } - else - do_start_loop + if supervised? + register_signal_handlers + set_procline end - end - def do_start_loop - loop do - break if shutting_down? - - wrap_in_app_executor do - run - end - end - ensure - run_callbacks(:shutdown) { shutdown } + SolidQueue.logger.info("[SolidQueue] Starting #{self}") end def shutting_down? @@ -70,6 +54,9 @@ def all_work_completed? false end + def set_procline + end + def running_inline? mode.inline? end diff --git a/lib/solid_queue/processes/supervised.rb b/lib/solid_queue/processes/supervised.rb index 0d37d5e5..395da428 100644 --- a/lib/solid_queue/processes/supervised.rb +++ b/lib/solid_queue/processes/supervised.rb @@ -14,6 +14,10 @@ def supervised_by(process) end private + def set_procline + procline "waiting" + end + def supervisor_went_away? supervised? && supervisor&.pid != ::Process.ppid end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 2b3f0a79..88265b3d 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -2,7 +2,7 @@ module SolidQueue class Worker < Processes::Base - include Processes::Runnable, Processes::Poller + include Processes::Poller attr_accessor :queues, :pool @@ -15,22 +15,17 @@ def initialize(**options) end private - def run - polled_executions = poll - - if polled_executions.size > 0 - procline "performing #{polled_executions.count} jobs" - - polled_executions.each do |execution| + def poll + claim_executions.then do |executions| + executions.each do |execution| pool.post(execution) end - else - procline "waiting for jobs in #{queues.join(",")}" - interruptible_sleep(polling_interval) + + executions.size end end - def poll + def claim_executions with_polling_volume do SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id) end @@ -47,6 +42,10 @@ def all_work_completed? SolidQueue::ReadyExecution.aggregated_count_across(queues).zero? end + def set_procline + procline "waiting for jobs in #{queues.join(",")}" + end + def metadata super.merge(queues: queues.join(","), thread_pool_size: pool.size) end