Skip to content

Commit

Permalink
Fixes #29554 - Katello Applicability host processing queue (Katello#8717
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ianballou authored May 22, 2020
1 parent 7c7f292 commit fa45258
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@ module Actions
module Katello
module Applicability
module Hosts
class Generate < Actions::EntryAction
class BulkGenerate < Actions::EntryAction
input_format do
param :host_ids, Array
end

def run
input[:host_ids].each do |host_id|
::Katello::EventQueue.push_event(::Katello::Events::GenerateHostApplicability::EVENT_TYPE, host_id)
content_facet = ::Host.find(host_id).content_facet
content_facet.calculate_and_import_applicability
end
end

def queue
::Katello::HOST_TASKS_QUEUE
end

def resource_locks
:link
end

def humanized_name
_("Bulk generate applicability for hosts")
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ class Regenerate < Actions::EntryAction
end

def run
::Katello::Repository.find(input[:repo_id]).content_facets.each do |facet|
::Katello::EventQueue.push_event(::Katello::Events::GenerateHostApplicability::EVENT_TYPE, facet.host.id)
host_ids = ::Katello::Repository.find(input[:repo_id]).hosts_with_applicability.pluck(:id)
return if host_ids.empty?

host_ids.each do |host_id|
::Katello::ApplicableHostQueue.push_host(host_id)
end

::Katello::EventQueue.push_event(::Katello::Events::GenerateHostApplicability::EVENT_TYPE, 0)
end

def humanized_name
Expand Down
9 changes: 7 additions & 2 deletions app/lib/actions/katello/host/generate_applicability.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def queue

def plan(hosts, use_queue = true)
if SETTINGS[:katello][:katello_applicability]
plan_action(::Actions::Katello::Applicability::Hosts::Generate, host_ids: hosts.pluck(:id))
plan_self(:host_ids => hosts.map(&:id))
else
uuids = hosts.map { |host| host.content_facet.try(:uuid) }.compact
unless uuids.empty?
Expand All @@ -19,7 +19,12 @@ def plan(hosts, use_queue = true)
end

def finalize
unless SETTINGS[:katello][:katello_applicability]
if SETTINGS[:katello][:katello_applicability]
input[:host_ids].each do |host_id|
::Katello::ApplicableHostQueue.push_host(host_id)
end
::Katello::EventQueue.push_event(::Katello::Events::GenerateHostApplicability::EVENT_TYPE, 0)
else
input[:host_ids].each do |host_id|
if input[:use_queue]
::Katello::EventQueue.push_event(::Katello::Events::ImportHostApplicability::EVENT_TYPE, host_id)
Expand Down
3 changes: 1 addition & 2 deletions app/lib/actions/katello/repository/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ def plan(repo, _pulp_sync_task_id = nil, options = {})
incremental = options.fetch(:incremental, false)
validate_contents = options.fetch(:validate_contents, false)
skip_metadata_check = options.fetch(:skip_metadata_check, false) || (validate_contents && repo.yum?)
# TODO: Remove the check for Pulp 3 once Pulp 3 errata is working fully
generate_applicability = repo.yum? && !SmartProxy.pulp_master.pulp3_support?(repo)
generate_applicability = repo.yum?

fail ::Katello::Errors::InvalidActionOptionError, _("Unable to sync repo. This repository does not have a feed url.") if repo.url.blank? && source_url.blank?
fail ::Katello::Errors::InvalidActionOptionError, _("Cannot validate contents on non-yum/deb repositories.") if validate_contents && !repo.yum? && !repo.deb?
Expand Down
11 changes: 6 additions & 5 deletions app/models/katello/events/generate_host_applicability.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ def self.retry_seconds
180
end

def initialize(host_id)
@host = ::Host.find_by_id(host_id)
Rails.logger.warn "Host not found for ID #{object_id}" if @host.nil?
def initialize(object_id)
end

def run
return unless @host
return if ::Katello::ApplicableHostQueue.queue_depth == 0

begin
ForemanTasks.async_task(::Actions::Katello::Applicability::Host::Generate, host_id: @host.id)
while ::Katello::ApplicableHostQueue.queue_depth != 0
hosts = ::Katello::ApplicableHostQueue.pop_hosts
ForemanTasks.async_task(::Actions::Katello::Applicability::Hosts::BulkGenerate, host_ids: hosts.map(&:host_id))
end
rescue => e
self.retry = true if e.is_a?(ForemanTasks::Lock::LockConflict)
raise e
Expand Down
4 changes: 4 additions & 0 deletions app/models/katello/host_queue_element.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module Katello
class HostQueueElement < Katello::Model
end
end
4 changes: 3 additions & 1 deletion app/models/setting/content.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ def self.default_settings
"as long as the registering DMI UUID is not used by another host."),
true, N_('Host Profile Assume')),
self.set('host_tasks_workers_pool_size', N_("Amount of workers in the pool to handle the execution of host-related tasks. When set to 0, the default queue will be used instead. Restart of the dynflowd/foreman-tasks service is required."),
5, N_('Host Tasks Workers Pool Size'))
5, N_('Host Tasks Workers Pool Size')),
self.set('applicability_batch_size', N_("Number of host applicability calculations to process per task."),
50, N_('Applicability Batch Size'))
]
end

Expand Down
21 changes: 21 additions & 0 deletions app/services/katello/applicable_host_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module Katello
class ApplicableHostQueue
def self.batch_size
::Setting::Content.find_by(name: "applicability_batch_size").value
end

def self.queue_depth
::Katello::HostQueueElement.all.size
end

def self.push_host(host_id)
HostQueueElement.create!({ host_id: host_id })
end

def self.pop_hosts(amount = self.batch_size)
queue = HostQueueElement.group(:host_id).select("MIN(created_at) as created_at, host_id").limit(amount)
HostQueueElement.where(host_id: queue.map(&:host_id)).delete_all
queue
end
end
end
12 changes: 12 additions & 0 deletions db/migrate/20200511204005_create_katello_host_queue_elements.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class CreateKatelloHostQueueElements < ActiveRecord::Migration[6.0]
def up
create_table :katello_host_queue_elements do |t|
t.integer :host_id
t.column :created_at, :datetime
end
end

def down
drop_table :katello_host_queue_elements
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ class GenerateApplicabilityTest < ActiveSupport::TestCase
User.current = users(:admin)
@host = FactoryBot.build(:host, :with_content, :with_subscription, :content_view => katello_content_views(:library_dev_view),
:lifecycle_environment => katello_environments(:library))
@host.save!
SETTINGS[:katello][:katello_applicability] = true
end

after :all do
SETTINGS[:katello][:katello_applicability] = false
::Setting::Content.find_by(name: "applicability_batch_size").update(value: 50)
end

describe 'Host Generate Applicability using Katello Applicability' do
let(:action_class) { ::Actions::Katello::Host::GenerateApplicability }

it 'plans' do
action = create_action action_class
it 'runs' do
Katello::ApplicableHostQueue.expects(:push_host).with(@host.id).times(5)
Katello::EventQueue.expects(:push_event).with(::Katello::Events::GenerateHostApplicability::EVENT_TYPE, 0)

plan_action action, [@host]

assert_action_planed_with(action, Actions::Katello::Applicability::Hosts::Generate,
:host_ids => [@host.id])
ForemanTasks.sync_task(action_class, [@host, @host, @host, @host, @host])
end
end
end
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
require 'katello_test_helper'

module ::Actions::Katello::Applicability::Repository
class RepositoryRegenerateApplicabilityTest < ActiveSupport::TestCase
include Dynflow::Testing
include Support::Actions::Fixtures
include FactoryBot::Syntax::Methods

before :all do
User.current = users(:admin)
@host = FactoryBot.build(:host, :with_content, :with_subscription, :content_view => katello_content_views(:library_dev_view),
:lifecycle_environment => katello_environments(:library))
@host.save!
@repo = katello_repositories(:fedora_17_x86_64_duplicate)
SETTINGS[:katello][:katello_applicability] = true
end

after :all do
SETTINGS[:katello][:katello_applicability] = false
end

describe 'Repository Regenerate Applicability using Katello Applicability' do
let(:action_class) { ::Actions::Katello::Applicability::Repository::Regenerate }

it 'runs' do
Katello::Repository.any_instance.stubs(:hosts_with_applicability).returns([@host])
Katello::ApplicableHostQueue.expects(:push_host).with(@host.id)
Katello::EventQueue.expects(:push_event).with(::Katello::Events::GenerateHostApplicability::EVENT_TYPE, 0)

ForemanTasks.sync_task(action_class, :repo_id => @repo.id)
end
end
end
end
49 changes: 49 additions & 0 deletions test/services/katello/applicable_host_queue_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
require 'katello_test_helper'

module Katello
class ApplicableHostQueueTest < ActiveSupport::TestCase
def setup
::Setting::Content.find_by(name: "applicability_batch_size").update(value: 50)
end

def test_pop_nothing
assert_equal [], ApplicableHostQueue.pop_hosts
end

def test_pop_1_host
ApplicableHostQueue.push_host(999)
popped_hosts = ApplicableHostQueue.pop_hosts

assert_equal [999], popped_hosts.map(&:host_id).sort
end

def test_pop_5_hosts
5.times { |i| ApplicableHostQueue.push_host(i) }
popped_hosts = ApplicableHostQueue.pop_hosts

assert_equal [0, 1, 2, 3, 4], popped_hosts.map(&:host_id).sort
end

def test_pop_batch_size_only
::Setting::Content.find_by(name: "applicability_batch_size").update(value: 3)
5.times { |i| ApplicableHostQueue.push_host(i) }
popped_hosts = ApplicableHostQueue.pop_hosts

assert_equal 3, popped_hosts.to_a.size
end

def test_pop_duplicate_hosts
5.times { |i| ApplicableHostQueue.push_host(i) }
5.times { |i| ApplicableHostQueue.push_host(i) }
popped_hosts = ApplicableHostQueue.pop_hosts

assert_equal [0, 1, 2, 3, 4], popped_hosts.map(&:host_id).sort
end

def test_queue_depth
3.times { |i| ApplicableHostQueue.push_host(i) }

assert_equal 3, ApplicableHostQueue.queue_depth
end
end
end

0 comments on commit fa45258

Please sign in to comment.