Skip to content

Commit

Permalink
Use intermediate Proxy object for the Semaphore low-level actions
Browse files Browse the repository at this point in the history
Building on a suggestion from @jorgemanrubia.
  • Loading branch information
rosa committed Nov 22, 2023
1 parent ed30280 commit 61fec0d
Showing 1 changed file with 41 additions and 12 deletions.
53 changes: 41 additions & 12 deletions app/models/solid_queue/semaphore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,60 @@ class SolidQueue::Semaphore < SolidQueue::Record

class << self
def wait(job)
if semaphore = find_by(key: job.concurrency_key)
semaphore.value > 0 && attempt_decrement(job.concurrency_key, job.concurrency_duration)
Proxy.new(job, self).wait
end

def signal(job)
Proxy.new(job, self).signal
end
end

class Proxy
def initialize(job, proxied_class)
@job = job
@proxied_class = proxied_class
end

def wait
if semaphore = proxied_class.find_by(key: key)
semaphore.value > 0 && attempt_decrement
else
attempt_creation(job.concurrency_key, job.concurrency_limit, job.concurrency_duration)
attempt_creation
end
end

def signal(job)
attempt_increment(job.concurrency_key, job.concurrency_limit, job.concurrency_duration)
def signal
attempt_increment
end

private
def attempt_creation(key, limit, duration)
create!(key: key, value: limit - 1, expires_at: duration.from_now)
attr_reader :job, :proxied_class

def attempt_creation
proxied_class.create!(key: key, value: limit - 1, expires_at: expires_at)
true
rescue ActiveRecord::RecordNotUnique
attempt_decrement(key, duration)
attempt_decrement
end

def attempt_decrement
proxied_class.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
end

def attempt_increment
proxied_class.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0
end

def key
job.concurrency_key
end

def attempt_decrement(key, duration)
available.where(key: key).update_all([ "value = value - 1, expires_at = ?", duration.from_now ]) > 0
def expires_at
job.concurrency_duration.from_now
end

def attempt_increment(key, limit, duration)
where("value < ?", limit).where(key: key).update_all([ "value = value + 1, expires_at = ?", duration.from_now ]) > 0
def limit
job.concurrency_limit
end
end
end

0 comments on commit 61fec0d

Please sign in to comment.