From 8d1e27c620766918fba03f573b9836bcffbc35d5 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 14 Mar 2024 22:25:55 +0100 Subject: [PATCH] Refactor a bit the Poller vs. Runnable modules Make the loop be part of Poller. Allow to have other other Runnable processes that don't need an infinite loop. I'm still not super happy with these concerns. This needs more work that will come when I properly implement async mode. Right now this is all interleaved in the modules and it shouldn't be. --- lib/solid_queue/dispatcher.rb | 14 +++++------ lib/solid_queue/processes/poller.rb | 28 ++++++++++++++++++++++ lib/solid_queue/processes/runnable.rb | 31 +++++++------------------ lib/solid_queue/processes/supervised.rb | 4 ++++ lib/solid_queue/worker.rb | 23 +++++++++--------- 5 files changed, 59 insertions(+), 41 deletions(-) diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index 41c74694..994be6aa 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -2,7 +2,7 @@ module SolidQueue class Dispatcher < Processes::Base - include Processes::Runnable, Processes::Poller + include Processes::Poller attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule @@ -20,13 +20,9 @@ def initialize(**options) end private - def run + def poll batch = dispatch_next_batch - - unless batch.size > 0 - procline "waiting" - interruptible_sleep(polling_interval) - end + batch.size end def dispatch_next_batch @@ -51,6 +47,10 @@ def unload_recurring_schedule recurring_schedule.unload_tasks end + def set_procline + procline "waiting" + end + def metadata super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence ) end diff --git a/lib/solid_queue/processes/poller.rb b/lib/solid_queue/processes/poller.rb index aa772807..d27274ef 100644 --- a/lib/solid_queue/processes/poller.rb +++ b/lib/solid_queue/processes/poller.rb @@ -4,11 +4,39 @@ module SolidQueue::Processes module Poller extend ActiveSupport::Concern + include Runnable + included do attr_accessor :polling_interval end private + def run + if mode.async? + @thread = Thread.new { start_loop } + else + start_loop + end + end + + def start_loop + loop do + break if shutting_down? + + wrap_in_app_executor do + unless poll > 0 + interruptible_sleep(polling_interval) + end + end + end + ensure + run_callbacks(:shutdown) { shutdown } + end + + def poll + raise NotImplementedError + end + def with_polling_volume if SolidQueue.silence_polling? ActiveRecord::Base.logger.silence { yield } diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index 71b9ebf9..da411d46 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -8,9 +8,9 @@ module Runnable def start @stopping = false - run_callbacks(:boot) { boot } - start_loop + + run end def stop @@ -26,28 +26,12 @@ def mode end def boot - register_signal_handlers if supervised? - SolidQueue.logger.info("[SolidQueue] Starting #{self}") - end - - def start_loop - if mode.async? - @thread = Thread.new { do_start_loop } - else - do_start_loop + if supervised? + register_signal_handlers + set_procline end - end - def do_start_loop - loop do - break if shutting_down? - - wrap_in_app_executor do - run - end - end - ensure - run_callbacks(:shutdown) { shutdown } + SolidQueue.logger.info("[SolidQueue] Starting #{self}") end def shutting_down? @@ -70,6 +54,9 @@ def all_work_completed? false end + def set_procline + end + def running_inline? mode.inline? end diff --git a/lib/solid_queue/processes/supervised.rb b/lib/solid_queue/processes/supervised.rb index 0d37d5e5..395da428 100644 --- a/lib/solid_queue/processes/supervised.rb +++ b/lib/solid_queue/processes/supervised.rb @@ -14,6 +14,10 @@ def supervised_by(process) end private + def set_procline + procline "waiting" + end + def supervisor_went_away? supervised? && supervisor&.pid != ::Process.ppid end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 2b3f0a79..88265b3d 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -2,7 +2,7 @@ module SolidQueue class Worker < Processes::Base - include Processes::Runnable, Processes::Poller + include Processes::Poller attr_accessor :queues, :pool @@ -15,22 +15,17 @@ def initialize(**options) end private - def run - polled_executions = poll - - if polled_executions.size > 0 - procline "performing #{polled_executions.count} jobs" - - polled_executions.each do |execution| + def poll + claim_executions.then do |executions| + executions.each do |execution| pool.post(execution) end - else - procline "waiting for jobs in #{queues.join(",")}" - interruptible_sleep(polling_interval) + + executions.size end end - def poll + def claim_executions with_polling_volume do SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id) end @@ -47,6 +42,10 @@ def all_work_completed? SolidQueue::ReadyExecution.aggregated_count_across(queues).zero? end + def set_procline + procline "waiting for jobs in #{queues.join(",")}" + end + def metadata super.merge(queues: queues.join(","), thread_pool_size: pool.size) end