diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index a22a82d8..fb988075 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -24,7 +24,8 @@ def metadata private def poll batch = dispatch_next_batch - batch.size + + batch.size.zero? ? polling_interval : 0.seconds end def dispatch_next_batch diff --git a/lib/solid_queue/processes/interruptible.rb b/lib/solid_queue/processes/interruptible.rb index 09c027b6..3bff1dd9 100644 --- a/lib/solid_queue/processes/interruptible.rb +++ b/lib/solid_queue/processes/interruptible.rb @@ -12,13 +12,17 @@ def interrupt queue << true end + # Sleeps for 'time'. Can be interrupted asynchronously and return early via wake_up. + # @param time [Numeric] the time to sleep. 0 returns immediately. + # @return [true, nil] + # * returns `true` if an interrupt was requested via #wake_up between the + # last call to `interruptible_sleep` and now, resulting in an early return. + # * returns `nil` if it slept the full `time` and was not interrupted. def interruptible_sleep(time) - # Invoking from the main thread can result in a 35% slowdown (at least when running the test suite). - # Using some form of Async (Futures) addresses this performance issue. + # Invoking this from the main thread may result in significant slowdown. + # Utilizing asynchronous execution (Futures) addresses this performance issue. Concurrent::Promises.future(time) do |timeout| - if timeout > 0 && queue.pop(timeout:) - queue.clear - end + queue.pop(timeout:).tap { queue.clear } end.value end diff --git a/lib/solid_queue/processes/poller.rb b/lib/solid_queue/processes/poller.rb index bf5a7450..75df6104 100644 --- a/lib/solid_queue/processes/poller.rb +++ b/lib/solid_queue/processes/poller.rb @@ -25,11 +25,11 @@ def start_loop loop do break if shutting_down? - wrap_in_app_executor do - unless poll > 0 - interruptible_sleep(polling_interval) - end + delay = wrap_in_app_executor do + poll end + + interruptible_sleep(delay) end ensure SolidQueue.instrument(:shutdown_process, process: self) do diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index fc203774..f34a14f0 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -7,6 +7,7 @@ class Worker < Processes::Poller after_boot :run_start_hooks before_shutdown :run_stop_hooks + attr_accessor :queues, :pool def initialize(**options) @@ -29,7 +30,7 @@ def poll pool.post(execution) end - executions.size + pool.idle? ? polling_interval : 10.minutes end end diff --git a/test/unit/dispatcher_test.rb b/test/unit/dispatcher_test.rb index 42d57c92..5bca7743 100644 --- a/test/unit/dispatcher_test.rb +++ b/test/unit/dispatcher_test.rb @@ -92,6 +92,30 @@ class DispatcherTest < ActiveSupport::TestCase another_dispatcher&.stop end + test "sleeps `0.seconds` between polls if there are ready to dispatch jobs" do + dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1) + dispatcher.expects(:interruptible_sleep).with(0.seconds).at_least(3) + dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once + + 3.times { AddToBufferJob.set(wait: 0.1).perform_later("I'm scheduled") } + assert_equal 3, SolidQueue::ScheduledExecution.count + sleep 0.1 + + dispatcher.start + wait_while_with_timeout(1.second) { SolidQueue::ScheduledExecution.any? } + + assert_equal 0, SolidQueue::ScheduledExecution.count + assert_equal 3, SolidQueue::ReadyExecution.count + end + + test "sleeps `polling_interval` between polls if there are no un-dispatched jobs" do + dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1) + dispatcher.expects(:interruptible_sleep).with(0.seconds).never + dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once + dispatcher.start + sleep 0.1 + end + private def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index d511cf74..52b0d8e8 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -171,6 +171,26 @@ class WorkerTest < ActiveSupport::TestCase SolidQueue.process_heartbeat_interval = old_heartbeat_interval end + test "sleeps `10.minutes` if at capacity" do + 3.times { |i| StoreResultJob.perform_later(i, pause: 1.second) } + + @worker.expects(:interruptible_sleep).with(10.minutes).at_least_once + @worker.expects(:interruptible_sleep).with(@worker.polling_interval).never + + @worker.start + sleep 1.second + end + + test "sleeps `polling_interval` if worker not at capacity" do + 2.times { |i| StoreResultJob.perform_later(i, pause: 1.second) } + + @worker.expects(:interruptible_sleep).with(@worker.polling_interval).at_least_once + @worker.expects(:interruptible_sleep).with(10.minutes).never + + @worker.start + sleep 1.second + end + private def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence