Skip to content

Commit

Permalink
Refactor a bit the Poller vs. Runnable modules
Browse files Browse the repository at this point in the history
Make the loop be part of Poller. Allow to have other other
Runnable processes that don't need an infinite loop.

I'm still not super happy with these concerns. This needs more
work that will come when I properly implement async mode. Right now
this is all interleaved in the modules and it shouldn't be.
  • Loading branch information
rosa committed Mar 14, 2024
1 parent e104a5f commit 8d1e27c
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 41 deletions.
14 changes: 7 additions & 7 deletions lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module SolidQueue
class Dispatcher < Processes::Base
include Processes::Runnable, Processes::Poller
include Processes::Poller

attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule

Expand All @@ -20,13 +20,9 @@ def initialize(**options)
end

private
def run
def poll
batch = dispatch_next_batch

unless batch.size > 0
procline "waiting"
interruptible_sleep(polling_interval)
end
batch.size
end

def dispatch_next_batch
Expand All @@ -51,6 +47,10 @@ def unload_recurring_schedule
recurring_schedule.unload_tasks
end

def set_procline
procline "waiting"
end

def metadata
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence )
end
Expand Down
28 changes: 28 additions & 0 deletions lib/solid_queue/processes/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,39 @@ module SolidQueue::Processes
module Poller
extend ActiveSupport::Concern

include Runnable

included do
attr_accessor :polling_interval
end

private
def run
if mode.async?
@thread = Thread.new { start_loop }
else
start_loop
end
end

def start_loop
loop do
break if shutting_down?

wrap_in_app_executor do
unless poll > 0
interruptible_sleep(polling_interval)
end
end
end
ensure
run_callbacks(:shutdown) { shutdown }
end

def poll
raise NotImplementedError
end

def with_polling_volume
if SolidQueue.silence_polling?
ActiveRecord::Base.logger.silence { yield }
Expand Down
31 changes: 9 additions & 22 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ module Runnable

def start
@stopping = false

run_callbacks(:boot) { boot }
start_loop

run
end

def stop
Expand All @@ -26,28 +26,12 @@ def mode
end

def boot
register_signal_handlers if supervised?
SolidQueue.logger.info("[SolidQueue] Starting #{self}")
end

def start_loop
if mode.async?
@thread = Thread.new { do_start_loop }
else
do_start_loop
if supervised?
register_signal_handlers
set_procline
end
end

def do_start_loop
loop do
break if shutting_down?

wrap_in_app_executor do
run
end
end
ensure
run_callbacks(:shutdown) { shutdown }
SolidQueue.logger.info("[SolidQueue] Starting #{self}")
end

def shutting_down?
Expand All @@ -70,6 +54,9 @@ def all_work_completed?
false
end

def set_procline
end

def running_inline?
mode.inline?
end
Expand Down
4 changes: 4 additions & 0 deletions lib/solid_queue/processes/supervised.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ def supervised_by(process)
end

private
def set_procline
procline "waiting"
end

def supervisor_went_away?
supervised? && supervisor&.pid != ::Process.ppid
end
Expand Down
23 changes: 11 additions & 12 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module SolidQueue
class Worker < Processes::Base
include Processes::Runnable, Processes::Poller
include Processes::Poller

attr_accessor :queues, :pool

Expand All @@ -15,22 +15,17 @@ def initialize(**options)
end

private
def run
polled_executions = poll

if polled_executions.size > 0
procline "performing #{polled_executions.count} jobs"

polled_executions.each do |execution|
def poll
claim_executions.then do |executions|
executions.each do |execution|
pool.post(execution)
end
else
procline "waiting for jobs in #{queues.join(",")}"
interruptible_sleep(polling_interval)

executions.size
end
end

def poll
def claim_executions
with_polling_volume do
SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id)
end
Expand All @@ -47,6 +42,10 @@ def all_work_completed?
SolidQueue::ReadyExecution.aggregated_count_across(queues).zero?
end

def set_procline
procline "waiting for jobs in #{queues.join(",")}"
end

def metadata
super.merge(queues: queues.join(","), thread_pool_size: pool.size)
end
Expand Down

0 comments on commit 8d1e27c

Please sign in to comment.