Skip to content

Commit

Permalink
Merge pull request #272 from rails/persist-recurring-tasks
Browse files Browse the repository at this point in the history
Store canonical recurring tasks in the DB
  • Loading branch information
rosa authored Aug 7, 2024
2 parents addd870 + 953349c commit eebdd77
Show file tree
Hide file tree
Showing 22 changed files with 300 additions and 129 deletions.
17 changes: 16 additions & 1 deletion UPGRADING.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# Upgrading to version 0.5.x
This version includes a new migration to improve recurring tasks. To install it, just run:

```bash
$ bin/rails solid_queue:install:migrations
```

Or, if you're using a different database for Solid Queue:

```bash
$ bin/rails solid_queue:install:migrations DATABASE=<the_name_of_your_solid_queue_db>
```

And then run the migrations.


# Upgrading to version 0.4.x
This version introduced an _async_ mode to run the supervisor and have all workers and dispatchers run as part of the same process as the supervisor, instead of separate, forked, processes. Together with this, we introduced some changes in how the supervisor is started. Prior this change, you could choose whether you wanted to run workers, dispatchers or both, by starting Solid Queue as `solid_queue:work` or `solid_queue:dispatch`. From version 0.4.0, the only option available is:

Expand Down Expand Up @@ -26,7 +42,6 @@ the supervisor will run 1 dispatcher and no workers.
# Upgrading to version 0.3.x
This version introduced support for [recurring (cron-style) jobs](https://github.com/rails/solid_queue/blob/main/README.md#recurring-tasks), and it needs a new DB migration for it. To install it, just run:
```bash
Expand Down
19 changes: 16 additions & 3 deletions app/models/solid_queue/recurring_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,29 @@ class AlreadyRecorded < StandardError; end
scope :clearable, -> { where.missing(:job) }

class << self
def create_or_insert!(**attributes)
if connection.supports_insert_conflict_target?
# PostgreSQL fails and aborts the current transaction when it hits a duplicate key conflict
# during two concurrent INSERTs for the same value of an unique index. We need to explicitly
# indicate unique_by to ignore duplicate rows by this value when inserting
unless insert(attributes, unique_by: [ :task_key, :run_at ]).any?
raise AlreadyRecorded
end
else
create!(**attributes)
end
rescue ActiveRecord::RecordNotUnique
raise AlreadyRecorded
end

def record(task_key, run_at, &block)
transaction do
block.call.tap do |active_job|
if active_job
create!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at)
create_or_insert!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at)
end
end
end
rescue ActiveRecord::RecordNotUnique => e
raise AlreadyRecorded
end

def clear_in_batches(batch_size: 500)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
# frozen_string_literal: true

require "fugit"

module SolidQueue
class Dispatcher::RecurringTask
class RecurringTask < Record
serialize :arguments, coder: Arguments, default: []

validate :supported_schedule
validate :existing_job_class

scope :static, -> { where(static: true) }

class << self
def wrap(args)
args.is_a?(self) ? args : from_configuration(args.first, **args.second)
end

def from_configuration(key, **options)
new(key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args])
new(key: key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args])
end
end

attr_reader :key, :schedule, :class_name, :arguments

def initialize(key, class_name:, schedule:, arguments: nil)
@key = key
@class_name = class_name
@schedule = schedule
@arguments = Array(arguments)
def create_or_update_all(tasks)
if connection.supports_insert_conflict_target?
# PostgreSQL fails and aborts the current transaction when it hits a duplicate key conflict
# during two concurrent INSERTs for the same value of an unique index. We need to explicitly
# indicate unique_by to ignore duplicate rows by this value when inserting
upsert_all tasks.map(&:attributes_for_upsert), unique_by: :key
else
upsert_all tasks.map(&:attributes_for_upsert)
end
end
end

def delay_from_now
Expand Down Expand Up @@ -51,23 +62,27 @@ def enqueue(at:)
end
end

def valid?
parsed_schedule.instance_of?(Fugit::Cron)
end

def to_s
"#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) [ #{parsed_schedule.original} ]"
end

def to_h
{
schedule: schedule,
class_name: class_name,
arguments: arguments
}
def attributes_for_upsert
attributes.without("id", "created_at", "updated_at")
end

private
def supported_schedule
unless parsed_schedule.instance_of?(Fugit::Cron)
errors.add :schedule, :unsupported, message: "is not a supported recurring schedule"
end
end

def existing_job_class
unless job_class.present?
errors.add :class_name, :undefined, message: "doesn't correspond to an existing class"
end
end

def using_solid_queue_adapter?
job_class.queue_adapter_name.inquiry.solid_queue?
end
Expand All @@ -88,12 +103,13 @@ def arguments_with_kwargs
end
end


def parsed_schedule
@parsed_schedule ||= Fugit.parse(schedule)
end

def job_class
@job_class ||= class_name.safe_constantize
@job_class ||= class_name&.safe_constantize
end
end
end
17 changes: 17 additions & 0 deletions app/models/solid_queue/recurring_task/arguments.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

require "active_job/arguments"

module SolidQueue
class RecurringTask::Arguments
class << self
def load(data)
data.nil? ? [] : ActiveJob::Arguments.deserialize(ActiveSupport::JSON.load(data))
end

def dump(data)
ActiveSupport::JSON.dump(ActiveJob::Arguments.serialize(Array(data)))
end
end
end
end
5 changes: 3 additions & 2 deletions app/models/solid_queue/semaphore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def signal
end

private

attr_accessor :job

def attempt_creation
Expand All @@ -63,7 +62,9 @@ def attempt_creation
end
end

def check_limit_or_decrement = limit == 1 ? false : attempt_decrement
def check_limit_or_decrement
limit == 1 ? false : attempt_decrement
end

def attempt_decrement
Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
Expand Down
20 changes: 20 additions & 0 deletions db/migrate/20240719134516_create_recurring_tasks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class CreateRecurringTasks < ActiveRecord::Migration[7.1]
def change
create_table :solid_queue_recurring_tasks do |t|
t.string :key, null: false, index: { unique: true }
t.string :schedule, null: false
t.string :command, limit: 2048
t.string :class_name
t.text :arguments

t.string :queue_name
t.integer :priority, default: 0

t.boolean :static, default: true, index: true

t.text :description

t.timestamps
end
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def options_from_raw_config(key, defaults)

def parse_recurring_tasks(tasks)
Array(tasks).map do |id, options|
Dispatcher::RecurringTask.from_configuration(id, **options)
RecurringTask.from_configuration(id, **options)
end.select(&:valid?)
end

Expand Down
14 changes: 7 additions & 7 deletions lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ module SolidQueue
class Dispatcher < Processes::Poller
attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule

after_boot :start_concurrency_maintenance, :load_recurring_schedule
before_shutdown :stop_concurrency_maintenance, :unload_recurring_schedule
after_boot :start_concurrency_maintenance, :schedule_recurring_tasks
before_shutdown :stop_concurrency_maintenance, :unschedule_recurring_tasks

def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS)
Expand All @@ -19,7 +19,7 @@ def initialize(**options)
end

def metadata
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence)
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.task_keys.presence)
end

private
Expand All @@ -38,16 +38,16 @@ def start_concurrency_maintenance
concurrency_maintenance&.start
end

def load_recurring_schedule
recurring_schedule.load_tasks
def schedule_recurring_tasks
recurring_schedule.schedule_tasks
end

def stop_concurrency_maintenance
concurrency_maintenance&.stop
end

def unload_recurring_schedule
recurring_schedule.unload_tasks
def unschedule_recurring_tasks
recurring_schedule.unschedule_tasks
end

def all_work_completed?
Expand Down
33 changes: 21 additions & 12 deletions lib/solid_queue/dispatcher/recurring_schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,50 @@ class Dispatcher::RecurringSchedule
attr_reader :configured_tasks, :scheduled_tasks

def initialize(tasks)
@configured_tasks = Array(tasks).map { |task| Dispatcher::RecurringTask.wrap(task) }
@configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?)
@scheduled_tasks = Concurrent::Hash.new
end

def empty?
configured_tasks.empty?
end

def load_tasks
def schedule_tasks
wrap_in_app_executor do
persist_tasks
reload_tasks
end

configured_tasks.each do |task|
load_task(task)
schedule_task(task)
end
end

def load_task(task)
def schedule_task(task)
scheduled_tasks[task.key] = schedule(task)
end

def unload_tasks
def unschedule_tasks
scheduled_tasks.values.each(&:cancel)
scheduled_tasks.clear
end

def tasks
configured_tasks.each_with_object({}) { |task, hsh| hsh[task.key] = task.to_h }
end

def inspect
configured_tasks.map(&:to_s).join(" | ")
def task_keys
configured_tasks.map(&:key)
end

private
def persist_tasks
SolidQueue::RecurringTask.create_or_update_all configured_tasks
end

def reload_tasks
@configured_tasks = SolidQueue::RecurringTask.where(key: task_keys)
end

def schedule(task)
scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at|
thread_schedule.load_task(thread_task)
thread_schedule.schedule_task(thread_task)

wrap_in_app_executor do
thread_task.enqueue(at: thread_task_run_at)
Expand Down
18 changes: 17 additions & 1 deletion test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2024_02_18_110712) do
ActiveRecord::Schema[7.1].define(version: 2024_07_19_134516) do
create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "queue_name"
t.string "status"
Expand Down Expand Up @@ -101,6 +101,22 @@
t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true
end

create_table "solid_queue_recurring_tasks", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "key", null: false
t.string "schedule", null: false
t.string "command", limit: 2048
t.string "class_name"
t.text "arguments"
t.string "queue_name"
t.integer "priority", default: 0
t.boolean "static", default: true
t.text "description"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true
t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static"
end

create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
Expand Down
Loading

0 comments on commit eebdd77

Please sign in to comment.