From d01a9b2469013f0dc05810bbbfc5518cbb65672d Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Wed, 25 Sep 2024 03:04:38 -0400 Subject: [PATCH] Support nested batches * Parent batches will not complete until all child batches have been completed --- app/models/solid_queue/job_batch.rb | 44 +++++++++++++++++++---------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index 61780f38..eb41f7de 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -5,6 +5,7 @@ class JobBatch < Record belongs_to :job, foreign_key: :job_id, optional: true belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch", optional: true has_many :jobs, foreign_key: :batch_id + has_many :children, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch" serialize :on_finish_active_job, coder: JSON serialize :on_success_active_job, coder: JSON @@ -21,28 +22,33 @@ def current_batch_id end def enqueue(attributes = {}) - previous_batch_id = current_batch_id.presence || nil - job_batch = nil transaction do job_batch = create!(batch_attributes(attributes)) - ActiveSupport::IsolatedExecutionState[:current_batch_id] = job_batch.id - yield job_batch + wrap_in_batch_context(job_batch.id) do + yield job_batch + end end job_batch - ensure - ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id end def dispatch_finished_batches incomplete.order(:id).pluck(:id).each do |id| transaction do - where(id: id).non_blocking_lock.each(&:finish) + where(id: id).includes(:children, :jobs).non_blocking_lock.each(&:finish) end end end + def wrap_in_batch_context(batch_id) + previous_batch_id = current_batch_id.presence || nil + ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id + yield + ensure + ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id + end + private def batch_attributes(attributes) @@ -62,6 +68,8 @@ def batch_attributes(attributes) attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize end + attributes[:parent_job_batch_id] = current_batch_id if current_batch_id.present? + attributes end @@ -74,16 +82,13 @@ def as_active_job(active_job_klass) def enqueue(attributes = {}) raise "You cannot enqueue a batch that is already finished" if finished? - previous_batch_id = self.class.current_batch_id.presence || nil - transaction do - ActiveSupport::IsolatedExecutionState[:current_batch_id] = id - yield self + self.class.wrap_in_batch_context(id) do + yield self + end end self - ensure - ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id end def finished? @@ -110,6 +115,10 @@ def finish return unless status.in?([ :finished, :failed ]) end + children.find_each do |child| + return unless child.finished? + end + if on_finish_active_job.present? perform_completion_job(:on_finish_active_job, attrs) end @@ -118,7 +127,10 @@ def finish perform_completion_job(:on_success_active_job, attrs) end - update!({ finished_at: Time.zone.now }.merge(attrs)) + transaction do + parent_job_batch.touch(:changed_at, :last_changed_at) if parent_job_batch_id.present? + update!({ finished_at: Time.zone.now }.merge(attrs)) + end end private @@ -133,7 +145,9 @@ def perform_completion_job(job_field, attrs) active_job = ActiveJob::Base.deserialize(send(job_field)) active_job.send(:deserialize_arguments_if_needed) active_job.arguments = [ self ] + Array.wrap(active_job.arguments) - ActiveJob.perform_all_later([ active_job ]) + self.class.wrap_in_batch_context(id) do + ActiveJob.perform_all_later([ active_job ]) + end active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id attrs[job_field] = active_job.serialize end