diff --git a/README.md b/README.md index d4c68bed..d24c2267 100644 --- a/README.md +++ b/README.md @@ -493,6 +493,67 @@ class ApplicationMailer < ActionMailer::Base Rails.error.report(exception) raise exception end +``` + +## Batch jobs + +SolidQueue offers support for batching jobs. This allows you to track progress of a set of jobs, +and optionally trigger callbacks based on their status. It supports the following: + +- Relating jobs to a batch, to track their status +- Three available callbacks to fire: + - `on_finish`: Fired when all jobs have finished, including retries. Fires even when some jobs have failed. + - `on_success`: Fired when all jobs have succeeded, including retries. Will not fire if any jobs have failed, but will fire if jobs have been discarded using `discard_on` + - `on_failure`: Fired the _first_ time a job fails, after all retries are exhausted. +- If a job is part of a batch, it can enqueue more jobs for that batch using `batch#enqueue` +- Batches can be nested within other batches, creating a hierarchy. Outer batches will not finish until all nested batches have finished. + +```rb +class SleepyJob < ApplicationJob + def perform(seconds_to_sleep) + Rails.logger.info "Feeling #{seconds_to_sleep} seconds sleepy..." + sleep seconds_to_sleep + end +end + +class MultiStepJob < ApplicationJob + def perform + batch.enqueue do + SleepyJob.perform_later(5) + # Because of this nested batch, the top-level batch won't finish until the inner, + # 10 second job finishes + # Both jobs will still run simultaneously + SolidQueue::JobBatch.enqueue do + SleepyJob.perform_later(10) + end + end + end +end + +class BatchFinishJob < ApplicationJob + def perform(batch) # batch is always the default first argument + Rails.logger.info "Good job finishing all jobs" + end +end + +class BatchSuccessJob < ApplicationJob + def perform(batch) # batch is always the default first argument + Rails.logger.info "Good job finishing all jobs, and all of them worked!" + end +end + +class BatchFailureJob < ApplicationJob + def perform(batch) # batch is always the default first argument + Rails.logger.info "At least one job failed, sorry!" + end +end + +SolidQueue::JobBatch.enqueue( + on_finish: BatchFinishJob, + on_success: BatchSuccessJob, + on_failure: BatchFailureJob +) do + 5.times.map { |i| SleepyJob.perform_later(i) } end ``` diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index c2b13909..4d776af7 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -66,6 +66,8 @@ def perform failed_with(result.error) raise result.error end + + job.job_batch.touch(:changed_at, :last_changed_at) if job.batch_id.present? ensure job.unblock_next_blocked_job end diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 8574c1ec..f21d313c 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -8,6 +8,8 @@ class EnqueueError < StandardError; end serialize :arguments, coder: JSON + belongs_to :job_batch, foreign_key: :batch_id, optional: true + class << self def enqueue_all(active_jobs) active_jobs_by_job_id = active_jobs.index_by(&:job_id) @@ -53,6 +55,7 @@ def create_all_from_active_jobs(active_jobs) end def attributes_from_active_job(active_job) + active_job.batch_id = JobBatch.current_batch_id || active_job.batch_id { queue_name: active_job.queue_name || DEFAULT_QUEUE_NAME, active_job_id: active_job.job_id, @@ -60,7 +63,8 @@ def attributes_from_active_job(active_job) scheduled_at: active_job.scheduled_at, class_name: active_job.class.name, arguments: active_job.serialize, - concurrency_key: active_job.concurrency_key + concurrency_key: active_job.concurrency_key, + batch_id: active_job.batch_id } end end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index e2146a67..2222f95e 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -76,7 +76,7 @@ def dispatch_bypassing_concurrency_limits end def finished! - if SolidQueue.preserve_finished_jobs? + if SolidQueue.preserve_finished_jobs? || batch_id.present? touch(:finished_at) else destroy! diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb new file mode 100644 index 00000000..40e183b5 --- /dev/null +++ b/app/models/solid_queue/job_batch.rb @@ -0,0 +1,166 @@ +# frozen_string_literal: true + +module SolidQueue + class JobBatch < Record + belongs_to :job, foreign_key: :job_id, optional: true + belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch", optional: true + has_many :jobs, foreign_key: :batch_id + has_many :children, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch" + + serialize :on_finish_active_job, coder: JSON + serialize :on_success_active_job, coder: JSON + serialize :on_failure_active_job, coder: JSON + + scope :incomplete, -> { + where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago) + } + scope :finished, -> { where.not(finished_at: nil) } + + class << self + def current_batch_id + ActiveSupport::IsolatedExecutionState[:current_batch_id] + end + + def enqueue(attributes = {}) + job_batch = nil + transaction do + job_batch = create!(batch_attributes(attributes)) + wrap_in_batch_context(job_batch.id) do + yield job_batch + end + end + + job_batch + end + + def dispatch_finished_batches + incomplete.order(:id).pluck(:id).each do |id| + transaction do + where(id: id).includes(:children, :jobs).non_blocking_lock.each(&:finish) + end + end + end + + def wrap_in_batch_context(batch_id) + previous_batch_id = current_batch_id.presence || nil + ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id + yield + ensure + ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id + end + + private + + def batch_attributes(attributes) + on_finish_klass = attributes.delete(:on_finish) + on_success_klass = attributes.delete(:on_success) + on_failure_klass = attributes.delete(:on_failure) + + if on_finish_klass.present? + attributes[:on_finish_active_job] = as_active_job(on_finish_klass).serialize + end + + if on_success_klass.present? + attributes[:on_success_active_job] = as_active_job(on_success_klass).serialize + end + + if on_failure_klass.present? + attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize + end + + attributes[:parent_job_batch_id] = current_batch_id if current_batch_id.present? + # Set it initially, so we check the batch even if there are no jobs + attributes[:changed_at] = Time.zone.now + attributes[:last_changed_at] = Time.zone.now + + attributes + end + + def as_active_job(active_job_klass) + active_job_klass.is_a?(ActiveJob::Base) ? active_job_klass : active_job_klass.new + end + end + + # Instance-level enqueue + def enqueue(attributes = {}) + raise "You cannot enqueue a batch that is already finished" if finished? + + transaction do + self.class.wrap_in_batch_context(id) do + yield self + end + end + + self + end + + def finished? + finished_at.present? + end + + def finish + return if finished? + reset_changed_at + + all_jobs_succeeded = true + attrs = {} + jobs.find_each do |next_job| + # SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished, + # and there is no record of the failure. + # GoodJob would report a discard as an error. It's possible we should do that in the future? + if fire_failure_job?(next_job) + perform_completion_job(:on_failure_active_job, attrs) + update!(attrs) + end + + status = next_job.status + all_jobs_succeeded = all_jobs_succeeded && status != :failed + return unless status.in?([ :finished, :failed ]) + end + + children.find_each do |child| + return unless child.finished? + end + + if on_finish_active_job.present? + perform_completion_job(:on_finish_active_job, attrs) + end + + if on_success_active_job.present? && all_jobs_succeeded + perform_completion_job(:on_success_active_job, attrs) + end + + transaction do + parent_job_batch.touch(:changed_at, :last_changed_at) if parent_job_batch_id.present? + update!({ finished_at: Time.zone.now }.merge(attrs)) + end + end + + private + + def fire_failure_job?(job) + return false if on_failure_active_job.blank? || job.failed_execution.blank? + job = ActiveJob::Base.deserialize(on_failure_active_job) + job.provider_job_id.blank? + end + + def perform_completion_job(job_field, attrs) + active_job = ActiveJob::Base.deserialize(send(job_field)) + active_job.send(:deserialize_arguments_if_needed) + active_job.arguments = [ self ] + Array.wrap(active_job.arguments) + self.class.wrap_in_batch_context(parent_job_batch_id || self.class.current_batch_id) do + ActiveJob.perform_all_later([ active_job ]) + end + active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id + attrs[job_field] = active_job.serialize + end + + def reset_changed_at + if changed_at.blank? && last_changed_at.present? + update_columns(last_changed_at: Time.zone.now) # wait another hour before we check again + else + update_columns(changed_at: nil) # clear out changed_at so we ignore this until the next job finishes + end + end + end +end diff --git a/db/migrate/20240131013203_create_solid_queue_batch_table.rb b/db/migrate/20240131013203_create_solid_queue_batch_table.rb new file mode 100644 index 00000000..91b76ee8 --- /dev/null +++ b/db/migrate/20240131013203_create_solid_queue_batch_table.rb @@ -0,0 +1,21 @@ +class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1] + def change + create_table :solid_queue_job_batches do |t| + t.references :parent_job_batch, index: true # FIXME: foreign key + t.text :on_finish_active_job + t.text :on_success_active_job + t.text :on_failure_active_job + t.datetime :finished_at + t.datetime :changed_at + t.datetime :last_changed_at + t.timestamps + + t.index [ :finished_at ] + t.index [ :changed_at ] + t.index [ :last_changed_at ] + end + + add_reference :solid_queue_jobs, :batch, index: true + add_foreign_key :solid_queue_jobs, :solid_queue_job_batches, column: :batch_id, on_delete: :cascade + end +end diff --git a/lib/active_job/job_batch_id.rb b/lib/active_job/job_batch_id.rb new file mode 100644 index 00000000..494e197f --- /dev/null +++ b/lib/active_job/job_batch_id.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +# Inspired by active_job/core.rb docs +# https://github.com/rails/rails/blob/1c2529b9a6ba5a1eff58be0d0373d7d9d401015b/activejob/lib/active_job/core.rb#L136 +module ActiveJob + module JobBatchId + extend ActiveSupport::Concern + + included do + attr_accessor :batch_id + end + + def serialize + super.merge("batch_id" => batch_id) + end + + def deserialize(job_data) + super + self.batch_id = job_data["batch_id"] + end + + def batch + @batch ||= SolidQueue::JobBatch.find_by(id: batch_id) + end + end +end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e7070d26..4e7bc6dd 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -5,6 +5,7 @@ require "active_job" require "active_job/queue_adapters" +require "active_job/job_batch_id" require "active_support" require "active_support/core_ext/numeric/time" diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index fb988075..e19f22cd 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -31,6 +31,7 @@ def poll def dispatch_next_batch with_polling_volume do ScheduledExecution.dispatch_next_batch(batch_size) + SolidQueue::JobBatch.dispatch_finished_batches end end diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index d10997c7..452ae445 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -35,6 +35,7 @@ class Engine < ::Rails::Engine initializer "solid_queue.active_job.extensions" do ActiveSupport.on_load :active_job do include ActiveJob::ConcurrencyControls + include ActiveJob::JobBatchId end end end diff --git a/test/dummy/app/jobs/batch_completion_job.rb b/test/dummy/app/jobs/batch_completion_job.rb new file mode 100644 index 00000000..0fb17284 --- /dev/null +++ b/test/dummy/app/jobs/batch_completion_job.rb @@ -0,0 +1,7 @@ +class BatchCompletionJob < ApplicationJob + queue_as :background + + def perform(batch) + Rails.logger.info "#{batch.jobs.size} jobs completed!" + end +end diff --git a/test/dummy/app/jobs/sleepy_job.rb b/test/dummy/app/jobs/sleepy_job.rb new file mode 100644 index 00000000..dd105cdc --- /dev/null +++ b/test/dummy/app/jobs/sleepy_job.rb @@ -0,0 +1,10 @@ +class SleepyJob < ApplicationJob + queue_as :background + + retry_on Exception, wait: 30.seconds, attempts: 5 + + def perform(seconds_to_sleep) + Rails.logger.info "Feeling #{seconds_to_sleep} seconds sleepy..." + sleep seconds_to_sleep + end +end diff --git a/test/dummy/db/queue_schema.rb b/test/dummy/db/queue_schema.rb index 697c2e92..64de0e82 100644 --- a/test/dummy/db/queue_schema.rb +++ b/test/dummy/db/queue_schema.rb @@ -38,6 +38,22 @@ t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true end + create_table "solid_queue_job_batches", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "parent_job_batch_id" + t.text "on_finish_active_job" + t.text "on_success_active_job" + t.text "on_failure_active_job" + t.datetime "finished_at" + t.datetime "changed_at" + t.datetime "last_changed_at" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["changed_at"], name: "index_solid_queue_job_batches_on_changed_at" + t.index ["finished_at"], name: "index_solid_queue_job_batches_on_finished_at" + t.index ["last_changed_at"], name: "index_solid_queue_job_batches_on_last_changed_at" + t.index ["parent_job_batch_id"], name: "index_solid_queue_job_batches_on_parent_job_batch_id" + end + create_table "solid_queue_jobs", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "queue_name", null: false t.string "class_name", null: false @@ -49,7 +65,9 @@ t.string "concurrency_key" t.datetime "created_at", null: false t.datetime "updated_at", null: false + t.bigint "batch_id" t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" + t.index ["batch_id"], name: "index_solid_queue_jobs_on_batch_id" t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" @@ -135,6 +153,7 @@ add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_jobs", "solid_queue_job_batches", column: "batch_id", on_delete: :cascade add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade diff --git a/test/integration/batch_lifecycle_test.rb b/test/integration/batch_lifecycle_test.rb new file mode 100644 index 00000000..22714315 --- /dev/null +++ b/test/integration/batch_lifecycle_test.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +require "test_helper" + +class BatchLifecycleTest < ActiveSupport::TestCase + setup do + @worker = SolidQueue::Worker.new(queues: "background", threads: 3) + @dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2) + end + + teardown do + @worker.stop + @dispatcher.stop + + JobBuffer.clear + + SolidQueue::Job.destroy_all + SolidQueue::JobBatch.destroy_all + end + + class BatchOnSuccessJob < ApplicationJob + queue_as :background + + def perform(batch, custom_message = "") + JobBuffer.add "#{custom_message}: #{batch.jobs.size} jobs succeeded!" + end + end + + class AddsMoreJobsJob < ApplicationJob + queue_as :background + + def perform + batch.enqueue do + AddToBufferJob.perform_later "added from inside 1" + AddToBufferJob.perform_later "added from inside 2" + SolidQueue::JobBatch.enqueue do + AddToBufferJob.perform_later "added from inside 3" + end + end + end + end + + test "nested batches finish from the inside out" do + batch2 = batch3 = batch4 = nil + batch1 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("3")) do + batch2 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("2")) do + batch3 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("1")) { } + batch4 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("1.1")) { } + end + end + + @dispatcher.start + @worker.start + + wait_for_job_batches_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(2.seconds) + + assert_equal [ "1: 0 jobs succeeded!", "1.1: 0 jobs succeeded!", "2: 2 jobs succeeded!", "3: 1 jobs succeeded!" ], JobBuffer.values + assert_equal 4, SolidQueue::JobBatch.finished.count + assert_equal batch1.reload.finished_at > batch2.reload.finished_at, true + assert_equal batch2.finished_at > batch3.reload.finished_at, true + assert_equal batch2.finished_at > batch4.reload.finished_at, true + end + + test "all jobs are run, including jobs enqueued inside of other jobs" do + SolidQueue::JobBatch.enqueue do + AddToBufferJob.perform_later "hey" + SolidQueue::JobBatch.enqueue do + AddToBufferJob.perform_later "ho" + AddsMoreJobsJob.perform_later + end + end + + @dispatcher.start + @worker.start + + wait_for_job_batches_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(2.seconds) + + assert_equal [ "added from inside 1", "added from inside 2", "added from inside 3", "hey", "ho" ], JobBuffer.values.sort + assert_equal 3, SolidQueue::JobBatch.finished.count + end +end diff --git a/test/models/solid_queue/job_batch_test.rb b/test/models/solid_queue/job_batch_test.rb new file mode 100644 index 00000000..e49f59c2 --- /dev/null +++ b/test/models/solid_queue/job_batch_test.rb @@ -0,0 +1,66 @@ +require "test_helper" + +class SolidQueue::JobBatchTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + teardown do + SolidQueue::Job.destroy_all + SolidQueue::JobBatch.destroy_all + end + + class BatchWithArgumentsJob < ApplicationJob + def perform(batch, arg1, arg2) + Rails.logger.info "Hi #{batch.id}, #{arg1}, #{arg2}!" + end + end + + class NiceJob < ApplicationJob + retry_on Exception, wait: 1.second + + def perform(arg) + Rails.logger.info "Hi #{arg}!" + end + end + + test "batch will be completed on success" do + batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) { } + assert_not_nil batch.on_finish_active_job + assert_equal BatchCompletionJob.name, batch.on_finish_active_job["job_class"] + end + + test "batch will be completed on finish" do + batch = SolidQueue::JobBatch.enqueue(on_success: BatchCompletionJob) { } + assert_not_nil batch.on_success_active_job + assert_equal BatchCompletionJob.name, batch.on_success_active_job["job_class"] + end + + test "sets the batch_id on jobs created inside of the enqueue block" do + batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do + NiceJob.perform_later("world") + NiceJob.perform_later("people") + end + + assert_equal 2, SolidQueue::Job.count + assert_equal [ batch.id ] * 2, SolidQueue::Job.last(2).map(&:batch_id) + end + + test "batch id is present inside the block" do + assert_nil SolidQueue::JobBatch.current_batch_id + SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do + assert_not_nil SolidQueue::JobBatch.current_batch_id + end + assert_nil SolidQueue::JobBatch.current_batch_id + end + + test "allow arguments and options for callbacks" do + SolidQueue::JobBatch.enqueue( + on_finish: BatchWithArgumentsJob.new(1, 2).set(queue: :batch), + ) do + NiceJob.perform_later("world") + end + + assert_not_nil SolidQueue::JobBatch.last.on_finish_active_job["arguments"] + assert_equal SolidQueue::JobBatch.last.on_finish_active_job["arguments"], [ 1, 2 ] + assert_equal SolidQueue::JobBatch.last.on_finish_active_job["queue_name"], "batch" + end +end diff --git a/test/test_helpers/jobs_test_helper.rb b/test/test_helpers/jobs_test_helper.rb index d0833fcf..f73458f0 100644 --- a/test/test_helpers/jobs_test_helper.rb +++ b/test/test_helpers/jobs_test_helper.rb @@ -9,6 +9,14 @@ def wait_for_jobs_to_finish_for(timeout = 1.second, except: []) end end + def wait_for_job_batches_to_finish_for(timeout = 1.second) + wait_while_with_timeout(timeout) do + skip_active_record_query_cache do + SolidQueue::JobBatch.where(finished_at: nil).any? + end + end + end + def assert_no_unfinished_jobs skip_active_record_query_cache do assert SolidQueue::Job.where(finished_at: nil).none?