Skip to content

Commit 0c2d0d0

Browse files
committed
Make halting work for execution plans in almost any state
It is the caller's responsibility to not try to halt an already stopped execution plan.
1 parent 475a027 commit 0c2d0d0

File tree

7 files changed

+160
-5
lines changed

7 files changed

+160
-5
lines changed

lib/dynflow/coordinator.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,19 @@ def self.valid_classes
266266
end
267267
end
268268

269+
class ExecutionInhibitionLock < Lock
270+
def initialize(execution_plan_id)
271+
super
272+
@data[:owner_id] = "execution-plan:#{execution_plan_id}"
273+
@data[:execution_plan_id] = execution_plan_id
274+
@data[:id] = self.class.lock_id(execution_plan_id)
275+
end
276+
277+
def self.lock_id(execution_plan_id)
278+
"execution-plan:#{execution_plan_id}"
279+
end
280+
end
281+
269282
class ExecutionLock < LockByWorld
270283
def initialize(world, execution_plan_id, client_world_id, request_id)
271284
super(world)

lib/dynflow/director.rb

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,15 +244,26 @@ def terminate
244244
end
245245

246246
def halt(event)
247-
manager = @execution_plan_managers[event.execution_plan_id]
248-
return unless manager
247+
halt_execution(event.execution_plan_id)
248+
end
249+
250+
private
251+
252+
def halt_execution(execution_plan_id)
253+
manager = @execution_plan_managers[execution_plan_id]
254+
@logger.warn "Halting execution plan #{execution_plan_id}"
255+
return halt_inactive(execution_plan_id) unless manager
249256

250-
@logger.warn "Halting execution plan #{event.execution_plan_id}"
251257
manager.halt
252258
finish_manager manager
253259
end
254260

255-
private
261+
def halt_inactive(execution_plan_id)
262+
plan = @world.persistence.load_execution_plan(execution_plan_id)
263+
plan.update_state(:stopped)
264+
rescue => e
265+
@logger.error e
266+
end
256267

257268
def unless_done(manager, work_items)
258269
return [] unless manager
@@ -316,6 +327,14 @@ def track_execution_plan(execution_plan_id, finished)
316327
"cannot execute execution_plan_id:#{execution_plan_id} it's stopped"
317328
end
318329

330+
lock_class = Coordinator::ExecutionInhibitionLock
331+
filters = { class: lock_class.to_s, owner_id: lock_class.lock_id(execution_plan_id) }
332+
if @world.coordinator.find_records(filters).any?
333+
halt_execution(execution_plan_id)
334+
raise Dynflow::Error,
335+
"cannot execute execution_plan_id:#{execution_plan_id} it's execution is inhibited"
336+
end
337+
319338
@execution_plan_managers[execution_plan_id] =
320339
ExecutionPlanManager.new(@world, execution_plan, finished)
321340
rescue Dynflow::Error => e

lib/dynflow/dispatcher/client_dispatcher.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,14 @@ def dispatch_request(request, client_world_id, request_id)
137137
(on ~Execution | ~Planning do |execution|
138138
AnyExecutor
139139
end),
140-
(on ~Event | ~Halt do |event|
140+
(on ~Event do |event|
141141
ignore_unknown = event.optional
142142
find_executor(event.execution_plan_id)
143143
end),
144+
(on ~Halt do |event|
145+
executor = find_executor(event.execution_plan_id)
146+
executor == Dispatcher::UnknownWorld ? AnyExecutor : executor
147+
end),
144148
(on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _|
145149
receiver_id
146150
end)

lib/dynflow/execution_plan.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,9 @@ def update_state(state, history_notice: :auto)
133133
telemetry_common_options.merge(:result => key.to_s))
134134
end
135135
hooks_to_run << key
136+
world.persistence.delete_delayed_plans(:execution_plan_uuid => id) if delay_record && original == :scheduled
136137
unlock_all_singleton_locks!
138+
unlock_execution_inhibition_lock!
137139
when :paused
138140
unlock_all_singleton_locks!
139141
else
@@ -566,6 +568,14 @@ def unlock_all_singleton_locks!
566568
end
567569
end
568570

571+
def unlock_execution_inhibition_lock!
572+
filter = { :owner_id => 'execution-plan:' + self.id,
573+
:class => Dynflow::Coordinator::ExecutionInhibitionLock.to_s }
574+
world.coordinator.find_locks(filter).each do |lock|
575+
world.coordinator.release(lock)
576+
end
577+
end
578+
569579
def toggle_telemetry_state(original, new)
570580
return if original == new
571581
@label = root_plan_step.action_class if @label.nil?

lib/dynflow/world.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent
252252
end
253253

254254
def halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future)
255+
coordinator.acquire(Coordinator::ExecutionInhibitionLock.new(execution_plan_id))
255256
publish_request(Dispatcher::Halt[execution_plan_id], accepted, false)
256257
end
257258

lib/dynflow/world/invalidation.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,27 @@ def invalidate(world)
2828
end
2929
end
3030

31+
prune_execution_inhibition_locks!
32+
3133
pruned = persistence.prune_envelopes(world.id)
3234
logger.error("Pruned #{pruned} envelopes for invalidated world #{world.id}") unless pruned.zero?
3335
coordinator.delete_world(world)
3436
end
3537
end
3638

39+
# Prunes execution inhibition locks which got somehow left behind.
40+
# Any execution inhibition locks, which have their corresponding execution
41+
# plan in stopped state, will be removed.
42+
def prune_execution_inhibition_locks!
43+
locks = coordinator.find_locks(class: Coordinator::ExecutionInhibitionLock.name)
44+
uuids = locks.map { |lock| lock.data[:execution_plan_id] }
45+
plan_uuids = persistence.find_execution_plans(filters: { uuid: uuids, state: 'stopped' }).map(&:id)
46+
47+
locks.select { |lock| plan_uuids.include? lock.data[:execution_plan_id] }.each do |lock|
48+
coordinator.release(lock)
49+
end
50+
end
51+
3752
def invalidate_planning_lock(planning_lock)
3853
with_valid_execution_plan_for_lock(planning_lock) do |plan|
3954
plan.steps.values.each { |step| invalidate_step step }

test/executor_test.rb

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,99 @@ def assert_next_steps(expected_next_step_ids, finished_step_id = nil, success =
735735
assert [world.terminate, world.terminate].map(&:value).all?
736736
end
737737
end
738+
739+
describe 'halting' do
740+
include TestHelpers
741+
let(:world) { WorldFactory.create_world }
742+
743+
it 'halts an execution plan with a suspended step' do
744+
triggered = world.trigger(Support::DummyExample::PlanEventsAction, ping_time: 1)
745+
plan = world.persistence.load_execution_plan(triggered.id)
746+
wait_for do
747+
plan = world.persistence.load_execution_plan(triggered.id)
748+
plan.state == :running
749+
end
750+
world.halt(triggered.id)
751+
wait_for('the execution plan to halt') do
752+
plan = world.persistence.load_execution_plan(triggered.id)
753+
plan.state == :stopped
754+
end
755+
_(plan.steps[2].state).must_equal :suspended
756+
end
757+
758+
it 'halts a paused execution plan' do
759+
triggered = world.trigger(Support::DummyExample::FailingDummy)
760+
plan = world.persistence.load_execution_plan(triggered.id)
761+
wait_for do
762+
plan = world.persistence.load_execution_plan(triggered.id)
763+
plan.state == :paused
764+
end
765+
world.halt(plan.id)
766+
wait_for('the execution plan to halt') do
767+
plan = world.persistence.load_execution_plan(triggered.id)
768+
plan.state == :stopped
769+
end
770+
_(plan.steps[2].state).must_equal :error
771+
end
772+
773+
it 'halts a planned execution plan' do
774+
plan = world.plan(Support::DummyExample::Dummy)
775+
wait_for do
776+
plan = world.persistence.load_execution_plan(plan.id)
777+
plan.state == :planned
778+
end
779+
world.halt(plan.id)
780+
wait_for('the execution plan to halt') do
781+
plan = world.persistence.load_execution_plan(plan.id)
782+
plan.state == :stopped
783+
end
784+
_(plan.steps[2].state).must_equal :pending
785+
end
786+
787+
it 'halts a scheduled execution plan' do
788+
plan = world.delay(Support::DummyExample::Dummy, {start_at: Time.now + 120})
789+
wait_for do
790+
plan = world.persistence.load_execution_plan(plan.id)
791+
plan.state == :scheduled
792+
end
793+
world.halt(plan.id)
794+
wait_for('the execution plan to halt') do
795+
plan = world.persistence.load_execution_plan(plan.id)
796+
plan.state == :stopped
797+
end
798+
_(plan.delay_record).must_be :nil?
799+
_(plan.steps[1].state).must_equal :pending
800+
end
801+
802+
it 'halts a pending execution plan' do
803+
plan = ExecutionPlan.new(world, nil)
804+
plan.save
805+
world.halt(plan.id)
806+
wait_for('the execution plan to halt') do
807+
plan = world.persistence.load_execution_plan(plan.id)
808+
plan.state == :stopped
809+
end
810+
end
811+
end
812+
813+
describe 'execution inhibition locks' do
814+
include TestHelpers
815+
let(:world) { WorldFactory.create_world }
816+
817+
it 'inhibits execution' do
818+
plan = world.plan(Support::DummyExample::Dummy)
819+
world.coordinator.acquire(Coordinator::ExecutionInhibitionLock.new(plan.id))
820+
triggered = world.execute(plan.id)
821+
triggered.wait
822+
_(triggered).must_be :rejected?
823+
824+
plan = world.persistence.load_execution_plan(plan.id)
825+
_(plan.state).must_equal :stopped
826+
827+
locks = world.coordinator.find_locks({ class: Coordinator::ExecutionInhibitionLock.to_s, owner_id: "execution-plan:#{plan.id}" })
828+
_(locks).must_be :empty?
829+
end
830+
end
738831
end
739832
end
740833
end

0 commit comments

Comments
 (0)