diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 57b76556..6b0a1c00 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -4,31 +4,60 @@ class SolidQueue::Semaphore < SolidQueue::Record class << self def wait(job) - if semaphore = find_by(key: job.concurrency_key) - semaphore.value > 0 && attempt_decrement(job.concurrency_key, job.concurrency_duration) + Proxy.new(job, self).wait + end + + def signal(job) + Proxy.new(job, self).signal + end + end + + class Proxy + def initialize(job, proxied_class) + @job = job + @proxied_class = proxied_class + end + + def wait + if semaphore = proxied_class.find_by(key: key) + semaphore.value > 0 && attempt_decrement else - attempt_creation(job.concurrency_key, job.concurrency_limit, job.concurrency_duration) + attempt_creation end end - def signal(job) - attempt_increment(job.concurrency_key, job.concurrency_limit, job.concurrency_duration) + def signal + attempt_increment end private - def attempt_creation(key, limit, duration) - create!(key: key, value: limit - 1, expires_at: duration.from_now) + attr_reader :job, :proxied_class + + def attempt_creation + proxied_class.create!(key: key, value: limit - 1, expires_at: expires_at) true rescue ActiveRecord::RecordNotUnique - attempt_decrement(key, duration) + attempt_decrement + end + + def attempt_decrement + proxied_class.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0 + end + + def attempt_increment + proxied_class.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0 + end + + def key + job.concurrency_key end - def attempt_decrement(key, duration) - available.where(key: key).update_all([ "value = value - 1, expires_at = ?", duration.from_now ]) > 0 + def expires_at + job.concurrency_duration.from_now end - def attempt_increment(key, limit, duration) - where("value < ?", limit).where(key: key).update_all([ "value = value + 1, expires_at = ?", duration.from_now ]) > 0 + def limit + job.concurrency_limit end end end