Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error When an Invalid Recurring Task is Configured #427

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ def configured_processes
end
end

def valid?
configured_processes.any? && (skip_recurring_tasks? || invalid_tasks.none?)
end

def error_messages
if configured_processes.none?
"No workers or processed configured. Exiting..."
else
error_messages = invalid_tasks.map do |task|
all_messages = task.errors.full_messages.map { |msg| "\t#{msg}" }.join("\n")
"#{task.key}:\n#{all_messages}"
end
.join("\n")

"Invalid processes configured:\n#{error_messages}"
end
end

def max_number_of_threads
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
workers_options.map { |options| options[:threads] }.max + 2
Expand All @@ -54,6 +72,10 @@ def default_options
}
end

def invalid_tasks
recurring_tasks.select(&:invalid?)
end

def only_work?
options[:only_work]
end
Expand Down Expand Up @@ -100,7 +122,7 @@ def dispatchers_options
def recurring_tasks
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
RecurringTask.from_configuration(id, **options)
end.select(&:valid?)
end
end

def processes_config
Expand Down
4 changes: 2 additions & 2 deletions lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ def start(**options)
SolidQueue.supervisor = true
configuration = Configuration.new(**options)

if configuration.configured_processes.any?
if configuration.valid?
new(configuration).tap(&:start)
else
abort "No workers or processed configured. Exiting..."
abort configuration.error_messages
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions test/dummy/config/recurring_with_invalid.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
periodic_store_result:
class: StoreResultJorrrrrrb
queue: default
args: [42, { status: "custom_status" }]
schedule: every second
2 changes: 1 addition & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def write(...)
Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions)

class ActiveSupport::TestCase
include ProcessesTestHelper, JobsTestHelper
include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper

teardown do
JobBuffer.clear
Expand Down
7 changes: 7 additions & 0 deletions test/test_helpers/configuration_test_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

module ConfigurationTestHelper
def config_file_path(name)
Rails.root.join("config/#{name}.yml")
end
end
10 changes: 10 additions & 0 deletions test/test_helpers/processes_test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ def run_supervisor_as_fork(**options)
end
end

def run_supervisor_as_fork_with_captured_io(**options)
pid = nil
out, err = capture_subprocess_io do
pid = run_supervisor_as_fork(**options)
wait_for_registered_processes(4)
end

[ pid, out, err ]
end

def wait_for_registered_processes(count, timeout: 1.second)
wait_while_with_timeout(timeout) { SolidQueue::Process.count != count }
end
Expand Down
22 changes: 18 additions & 4 deletions test/unit/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ class ConfigurationTest < ActiveSupport::TestCase
assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil
end

test "detects when there are invalid recurring tasks" do
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_invalid))

assert_not configuration.valid?
end

test "is valid when there are no recurring tasks" do
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring))

assert configuration.valid?
end

test "is valid when recurring tasks are skipped" do
configuration = SolidQueue::Configuration.new(skip_recurring: true)

assert configuration.valid?
end

private
def assert_processes(configuration, kind, count, **attributes)
processes = configuration.configured_processes.select { |p| p.kind == kind }
Expand Down Expand Up @@ -121,8 +139,4 @@ def assert_equal_value(expected_value, value)
assert_equal expected_value, value
end
end

def config_file_path(name)
Rails.root.join("config/#{name}.yml")
end
end
16 changes: 14 additions & 2 deletions test/unit/supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,22 @@ class SupervisorTest < ActiveSupport::TestCase
end

test "start with empty configuration" do
pid = run_supervisor_as_fork(workers: [], dispatchers: [])
pid, _out, err = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: [])
sleep(0.5)
assert_no_registered_processes

assert_not process_exists?(pid)
assert_match %r{No workers or processed configured. Exiting...}, err
end

test "start with invalid configuration" do
jherdman marked this conversation as resolved.
Show resolved Hide resolved
pid, _out, err = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false)

sleep(0.5)
assert_no_registered_processes

assert_not process_exists?(pid)
assert_match %r{Invalid processes configured}, err
end

test "create and delete pidfile" do
Expand All @@ -66,11 +77,12 @@ class SupervisorTest < ActiveSupport::TestCase
FileUtils.mkdir_p(File.dirname(@pidfile))
File.write(@pidfile, ::Process.pid.to_s)

pid = run_supervisor_as_fork
pid, _out, err = run_supervisor_as_fork_with_captured_io
wait_for_registered_processes(4)

assert File.exist?(@pidfile)
assert_not_equal pid, File.read(@pidfile).strip.to_i
assert_match %r{A Solid Queue supervisor is already running}, err

wait_for_process_termination_with_timeout(pid, exitstatus: 1)
end
Expand Down
Loading