Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specialize Dispatcher and Worker looping #444

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def metadata
private
def poll
batch = dispatch_next_batch
batch.size

batch.size.zero? ? polling_interval : 0.seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Concurrent::Promises.future(time) work correctly for time = 0.seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did test it with the queue based version of interruptible_sleep. I could add a test that would help every sleep better "knowing" it works.

I know I wouldn't take this @hms guys word for it...😉

end

def dispatch_next_batch
Expand Down
14 changes: 9 additions & 5 deletions lib/solid_queue/processes/interruptible.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ def interrupt
queue << true
end

# Sleeps for 'time'. Can be interrupted asynchronously and return early via wake_up.
# @param time [Numeric] the time to sleep. 0 returns immediately.
# @return [true, nil]
# * returns `true` if an interrupt was requested via #wake_up between the
# last call to `interruptible_sleep` and now, resulting in an early return.
# * returns `nil` if it slept the full `time` and was not interrupted.
def interruptible_sleep(time)
# Invoking from the main thread can result in a 35% slowdown (at least when running the test suite).
# Using some form of Async (Futures) addresses this performance issue.
# Invoking this from the main thread may result in significant slowdown.
# Utilizing asynchronous execution (Futures) addresses this performance issue.
Concurrent::Promises.future(time) do |timeout|
if timeout > 0 && queue.pop(timeout:)
queue.clear
end
queue.pop(timeout:).tap { queue.clear }
end.value
end

Expand Down
8 changes: 4 additions & 4 deletions lib/solid_queue/processes/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ def start_loop
loop do
break if shutting_down?

wrap_in_app_executor do
unless poll > 0
interruptible_sleep(polling_interval)
end
delay = wrap_in_app_executor do
poll
end

interruptible_sleep(delay)
end
ensure
SolidQueue.instrument(:shutdown_process, process: self) do
Expand Down
3 changes: 2 additions & 1 deletion lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Worker < Processes::Poller
after_boot :run_start_hooks
before_shutdown :run_stop_hooks


attr_accessor :queues, :pool

def initialize(**options)
Expand All @@ -29,7 +30,7 @@ def poll
pool.post(execution)
end

executions.size
pool.idle? ? polling_interval : 10.minutes
end
end

Expand Down
24 changes: 24 additions & 0 deletions test/unit/dispatcher_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,30 @@ class DispatcherTest < ActiveSupport::TestCase
another_dispatcher&.stop
end

test "sleeps `0.seconds` between polls if there are ready to dispatch jobs" do
dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1)
dispatcher.expects(:interruptible_sleep).with(0.seconds).at_least(3)
dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once

3.times { AddToBufferJob.set(wait: 0.1).perform_later("I'm scheduled") }
assert_equal 3, SolidQueue::ScheduledExecution.count
sleep 0.1

dispatcher.start
wait_while_with_timeout(1.second) { SolidQueue::ScheduledExecution.any? }

assert_equal 0, SolidQueue::ScheduledExecution.count
assert_equal 3, SolidQueue::ReadyExecution.count
end

test "sleeps `polling_interval` between polls if there are no un-dispatched jobs" do
dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1)
dispatcher.expects(:interruptible_sleep).with(0.seconds).never
dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once
dispatcher.start
sleep 0.1
end

private
def with_polling(silence:)
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence
Expand Down
20 changes: 20 additions & 0 deletions test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,26 @@ class WorkerTest < ActiveSupport::TestCase
SolidQueue.process_heartbeat_interval = old_heartbeat_interval
end

test "sleeps `10.minutes` if at capacity" do
3.times { |i| StoreResultJob.perform_later(i, pause: 1.second) }

@worker.expects(:interruptible_sleep).with(10.minutes).at_least_once
@worker.expects(:interruptible_sleep).with(@worker.polling_interval).never

@worker.start
sleep 1.second
end

test "sleeps `polling_interval` if worker not at capacity" do
2.times { |i| StoreResultJob.perform_later(i, pause: 1.second) }

@worker.expects(:interruptible_sleep).with(@worker.polling_interval).at_least_once
@worker.expects(:interruptible_sleep).with(10.minutes).never

@worker.start
sleep 1.second
end

private
def with_polling(silence:)
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence
Expand Down
Loading