Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement Interruptible #417

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@
# Folder for Visual Studio Code
/.vscode/

# Files for RVM holdouts
.ruby-gemset

# misc
.DS_Store
2 changes: 1 addition & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
inherit_gem: { rubocop-rails-omakase: rubocop.yml }

AllCops:
TargetRubyVersion: 3.0
TargetRubyVersion: 3.3
Exclude:
- "test/dummy/db/schema.rb"
- "test/dummy/db/queue_schema.rb"
12 changes: 11 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ GEM
concurrent-ruby (1.3.4)
connection_pool (2.4.1)
crass (1.0.6)
date (3.4.1)
debug (1.7.1)
irb (>= 1.5.0)
reline (>= 0.3.1)
drb (2.2.1)
erubi (1.13.0)
et-orbi (1.2.11)
tzinfo
fiddle (1.1.2)
fugit (1.11.1)
et-orbi (~> 1, >= 1.2.11)
raabro (~> 1.4)
Expand All @@ -74,6 +76,7 @@ GEM
reline (>= 0.4.2)
json (2.8.2)
language_server-protocol (3.17.0.3)
logger (1.6.0)
loofah (2.23.1)
crass (~> 1.0.2)
nokogiri (>= 1.12.0)
Expand All @@ -90,12 +93,14 @@ GEM
racc (~> 1.4)
nokogiri (1.16.7-x86_64-linux)
racc (~> 1.4)
ostruct (0.6.0)
parallel (1.26.3)
parser (3.3.6.0)
ast (~> 2.4.1)
racc
pg (1.5.4)
psych (5.2.0)
psych (5.2.1)
date
stringio
puma (6.4.3)
nio4r (~> 2.0)
Expand Down Expand Up @@ -175,16 +180,21 @@ GEM
PLATFORMS
arm64-darwin-22
arm64-darwin-23
arm64-darwin-24
x86_64-darwin-21
x86_64-darwin-23
x86_64-linux

DEPENDENCIES
debug
fiddle
logger
mocha
mysql2
ostruct
pg
puma
rdoc
rubocop-rails-omakase
solid_queue!
sqlite3
Expand Down
30 changes: 12 additions & 18 deletions lib/solid_queue/processes/interruptible.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,25 @@ def wake_up
end

private
SELF_PIPE_BLOCK_SIZE = 11

def interrupt
self_pipe[:writer].write_nonblock(".")
rescue Errno::EAGAIN, Errno::EINTR
# Ignore writes that would block and retry
# if another signal arrived while writing
retry
queue << true
end

def interruptible_sleep(time)
if time > 0 && self_pipe[:reader].wait_readable(time)
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
end
rescue Errno::EAGAIN, Errno::EINTR
# Since this is invoked on the main thread, using some form of Async
# avoids a 35% slowdown (at least when running the test suite).
#
# Using Futures for architectural consistency with all the other Async in SolidQueue.
hms marked this conversation as resolved.
Show resolved Hide resolved
Concurrent::Promises.future(time) do |timeout|
if timeout > 0 && queue.pop(timeout:)
queue.clear # exiting the poll wait guarantees testing for SHUTDOWN before next poll
end
end.value
end

# Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html)
def self_pipe
@self_pipe ||= create_self_pipe
end

def create_self_pipe
reader, writer = IO.pipe
{ reader: reader, writer: writer }
def queue
@queue ||= Queue.new
end
end
end
4 changes: 4 additions & 0 deletions solid_queue.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "pg"
spec.add_development_dependency "sqlite3"
spec.add_development_dependency "rubocop-rails-omakase"
spec.add_development_dependency "ostruct"
spec.add_development_dependency "logger"
spec.add_development_dependency "fiddle"
hms marked this conversation as resolved.
Show resolved Hide resolved
spec.add_development_dependency "rdoc"
end
12 changes: 12 additions & 0 deletions test/dummy/config/initializers/enable_yjit.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

# Ideally, tests should be configured as close to production settings as
# possible and YJIT is likely to be enabled. While it's highly unlikely
# YJIT would cause issues, enabling it confirms this assertion.
#
# Configured via initializer to align with Rails 7.1 default in gemspec
if defined?(RubyVM::YJIT.enable)
Rails.application.config.after_initialize do
RubyVM::YJIT.enable
end
end
2 changes: 1 addition & 1 deletion test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
test "run several jobs over the same record sequentially, with some of them failing" do
("A".."F").each_with_index do |name, i|
# A, C, E will fail, for i= 0, 2, 4
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (RuntimeError if i.even?))
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?))
end

("G".."K").each do |name|
Expand Down
8 changes: 4 additions & 4 deletions test/integration/instrumentation_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class InstrumentationTest < ActiveSupport::TestCase

test "errors when deregistering processes are included in deregister_process events" do
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false
error = RuntimeError.new("everything is broken")
error = ExpectedTestError.new("everything is broken")
SolidQueue::Process.any_instance.expects(:destroy!).raises(error).at_least_once

events = subscribed("deregister_process.solid_queue") do
Expand All @@ -182,7 +182,7 @@ class InstrumentationTest < ActiveSupport::TestCase
end

test "retrying failed job emits retry event" do
RaisingJob.perform_later(RuntimeError, "A")
RaisingJob.perform_later(ExpectedTestError, "A")
job = SolidQueue::Job.last

worker = SolidQueue::Worker.new.tap(&:start)
Expand All @@ -198,7 +198,7 @@ class InstrumentationTest < ActiveSupport::TestCase
end

test "retrying failed jobs in bulk emits retry_all" do
3.times { RaisingJob.perform_later(RuntimeError, "A") }
3.times { RaisingJob.perform_later(ExpectedTestError, "A") }
AddToBufferJob.perform_later("A")

jobs = SolidQueue::Job.last(4)
Expand Down Expand Up @@ -392,7 +392,7 @@ class InstrumentationTest < ActiveSupport::TestCase
test "thread errors emit thread_error events" do
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false

error = RuntimeError.new("everything is broken")
error = ExpectedTestError.new("everything is broken")
SolidQueue::ClaimedExecution::Result.expects(:new).raises(error).at_least_once

AddToBufferJob.perform_later "hey!"
Expand Down
8 changes: 5 additions & 3 deletions test/integration/jobs_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

class JobsLifecycleTest < ActiveSupport::TestCase
setup do
SolidQueue.on_thread_error = silent_on_thread_error_for([ ExpectedTestError, RaisingJob::DefaultError ])
@worker = SolidQueue::Worker.new(queues: "background", threads: 3)
@dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2)
end

teardown do
SolidQueue.on_thread_error = @on_thread_error
@worker.stop
@dispatcher.stop

Expand All @@ -29,16 +31,16 @@ class JobsLifecycleTest < ActiveSupport::TestCase
end

test "enqueue and run jobs that fail without retries" do
RaisingJob.perform_later(RuntimeError, "A")
RaisingJob.perform_later(RuntimeError, "B")
RaisingJob.perform_later(ExpectedTestError, "A")
RaisingJob.perform_later(ExpectedTestError, "B")
jobs = SolidQueue::Job.last(2)

@dispatcher.start
@worker.start

wait_for_jobs_to_finish_for(3.seconds)

message = "raised RuntimeError for the 1st time"
message = "raised ExpectedTestError for the 1st time"
assert_equal [ "A: #{message}", "B: #{message}" ], JobBuffer.values.sort

assert_empty SolidQueue::Job.finished
Expand Down
6 changes: 3 additions & 3 deletions test/integration/processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase
test "process some jobs that raise errors" do
2.times { enqueue_store_result_job("no error", :background) }
2.times { enqueue_store_result_job("no error", :default) }
error1 = enqueue_store_result_job("error", :background, exception: RuntimeError)
error1 = enqueue_store_result_job("error", :background, exception: ExpectedTestError)
enqueue_store_result_job("no error", :background, pause: 0.03)
error2 = enqueue_store_result_job("error", :background, exception: RuntimeError, pause: 0.05)
error2 = enqueue_store_result_job("error", :background, exception: ExpectedTestError, pause: 0.05)
2.times { enqueue_store_result_job("no error", :default, pause: 0.01) }
error3 = enqueue_store_result_job("error", :default, exception: RuntimeError)
error3 = enqueue_store_result_job("error", :default, exception: ExpectedTestError)

wait_for_jobs_to_finish_for(2.second, except: [ error1, error2, error3 ])

Expand Down
16 changes: 9 additions & 7 deletions test/models/solid_queue/failed_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,25 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase
end

test "run job that fails" do
RaisingJob.perform_later(RuntimeError, "A")
RaisingJob.perform_later(ExpectedTestError, "A")
@worker.start

assert_equal 1, SolidQueue::FailedExecution.count
assert SolidQueue::Job.last.failed?
end

test "run job that fails with a SystemStackError (stack level too deep)" do
InfiniteRecursionJob.perform_later
@worker.start
silence_on_thread_error_for(SystemStackError) do
InfiniteRecursionJob.perform_later
@worker.start

assert_equal 1, SolidQueue::FailedExecution.count
assert SolidQueue::Job.last.failed?
assert_equal 1, SolidQueue::FailedExecution.count
assert SolidQueue::Job.last.failed?
end
end

test "retry failed job" do
RaisingJob.perform_later(RuntimeError, "A")
RaisingJob.perform_later(ExpectedTestError, "A")
@worker.start

assert_difference -> { SolidQueue::FailedExecution.count }, -1 do
Expand All @@ -34,7 +36,7 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase
end

test "retry failed jobs in bulk" do
1.upto(5) { |i| RaisingJob.perform_later(RuntimeError, i) }
1.upto(5) { |i| RaisingJob.perform_later(ExpectedTestError, i) }
1.upto(3) { |i| AddToBufferJob.perform_later(i) }

@worker.start
Expand Down
33 changes: 33 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,20 @@ def write(...)
end

Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions)
class ExpectedTestError < RuntimeError; end


class ActiveSupport::TestCase
include ProcessesTestHelper, JobsTestHelper

setup do
# Could be cleaner with one several minitest gems, but didn't want to add new dependency
@_on_thread_error = SolidQueue.on_thread_error
SolidQueue.on_thread_error = silent_on_thread_error_for(ExpectedTestError)
end

teardown do
SolidQueue.on_thread_error = @_on_thread_error
JobBuffer.clear

if SolidQueue.supervisor_pidfile && File.exist?(SolidQueue.supervisor_pidfile)
Expand Down Expand Up @@ -69,4 +78,28 @@ def wait_while_with_timeout!(timeout, &block)
def skip_active_record_query_cache(&block)
SolidQueue::Record.uncached(&block)
end

# Silences specified exceptions during the execution of a block
#
# @param [Exception, Array<Exception>] expected an Exception or an array of Exceptions to ignore
# @yield Executes the provided block with specified exception(s) silenced
def silence_on_thread_error_for(expected, &block)
SolidQueue.with(on_thread_error: silent_on_thread_error_for(expected)) do
block.call
end
end

# Does not call on_thread_error for expected exceptions
# @param [Exception, Array<Exception>] expected an Exception or an array of Exceptions to ignore
def silent_on_thread_error_for(expected)
current_proc = SolidQueue.on_thread_error

->(exception) do
expected_exceptions = Array(expected)

unless expected_exceptions.any? { exception.instance_of?(_1) }
current_proc.call(exception)
end
end
end
end
10 changes: 5 additions & 5 deletions test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ class WorkerTest < ActiveSupport::TestCase
original_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { errors << error.message }
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false

SolidQueue::ReadyExecution.expects(:claim).raises(RuntimeError.new("everything is broken")).at_least_once
SolidQueue::ReadyExecution.expects(:claim).raises(ExpectedTestError.new("everything is broken")).at_least_once

AddToBufferJob.perform_later "hey!"

worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2).tap(&:start)
sleep(1)

assert_raises RuntimeError do
assert_raises ExpectedTestError do
worker.stop
end

Expand All @@ -51,7 +51,7 @@ class WorkerTest < ActiveSupport::TestCase
subscriber = ErrorBuffer.new
Rails.error.subscribe(subscriber)

SolidQueue::ClaimedExecution::Result.expects(:new).raises(RuntimeError.new("everything is broken")).at_least_once
SolidQueue::ClaimedExecution::Result.expects(:new).raises(ExpectedTestError.new("everything is broken")).at_least_once

AddToBufferJob.perform_later "hey!"

Expand All @@ -71,15 +71,15 @@ class WorkerTest < ActiveSupport::TestCase
subscriber = ErrorBuffer.new
Rails.error.subscribe(subscriber)

RaisingJob.perform_later(RuntimeError, "B")
RaisingJob.perform_later(ExpectedTestError, "B")

@worker.start

wait_for_jobs_to_finish_for(1.second)
@worker.wake_up

assert_equal 1, subscriber.errors.count
assert_equal "This is a RuntimeError exception", subscriber.messages.first
assert_equal "This is a ExpectedTestError exception", subscriber.messages.first
ensure
Rails.error.unsubscribe(subscriber) if Rails.error.respond_to?(:unsubscribe)
end
Expand Down
Loading