-
Notifications
You must be signed in to change notification settings - Fork 141
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Perform separate queries per queue assigned to worker
We do this in the order specified in the worker configuration, and by priority within each queue. It's still possible to specify "*", which means all queues, and in that case the order will be only by priority. Paused queues aren't queried. The way it works when there are more than one queue in the list is as follows. Imagine the worker is configured to run ``` queue1, queue2, queue3 ``` In that order. Then: - Query as most N (N is the worker's capacity, or its number of idle threads) jobs from queue1. If we got at least N jobs, stop and assign these to the pool, and continue querying jobs from queue1. - If we got fewer jobs than N, query jobs from queue2. If we reached N, stop and post those to the pool, and go back to queue1. - If we still have space because we haven't filled N, go on to queue3 and do the same. Then go back to queue1.
- Loading branch information
Showing
6 changed files
with
105 additions
and
75 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# frozen_string_literal: true | ||
|
||
module SolidQueue | ||
class QueueSelector | ||
attr_reader :raw_queues, :relation | ||
|
||
def initialize(queue_list, relation) | ||
@raw_queues = Array(queue_list).map { |queue| queue.to_s.strip }.presence || [ "*" ] | ||
@relation = relation | ||
end | ||
|
||
def scoped_relations | ||
if queue_names.empty? then [ relation.all ] | ||
else | ||
queue_names.map { |queue_name| relation.queued_as(queue_name) } | ||
end | ||
end | ||
|
||
private | ||
def queue_names | ||
if all? then filter_paused_queues | ||
else | ||
filter_paused_queues(exact_names) | ||
end | ||
end | ||
|
||
def all? | ||
"*".in? raw_queues | ||
end | ||
|
||
def filter_paused_queues(queues = []) | ||
paused_queues = Pause.all_queue_names | ||
if paused_queues.empty? then queues | ||
else | ||
queues = queues.presence || Queue.all.map(&:name) | ||
queues - paused_queues | ||
end | ||
end | ||
|
||
def exact_names | ||
@exact_names ||= raw_queues.select { |queue| !queue.include?("*") } | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters