Skip to content

Commit 4d632e1

Browse files
committed
Add support for halting execution plans
Up until now, Dynflow only had support for clean cancellation. Upon cancellation a cancel event was sent to all cancellable suspended and running steps and it was the action's responsibility to cancel itself. However, there were actions which just weren't cancellable at all. This commit adds support for halting execution plans. The actions are completely unaware of this, on halt dynflow just destroys its internal state about a given execution plan and turns it to stopped state. It does not remove running steps from workers as we currently don't have any real means of doing so. Any pending events will be rejected. In other words, halting an execution plan ensure it will not run any further.
1 parent e538686 commit 4d632e1

File tree

10 files changed

+123
-5
lines changed

10 files changed

+123
-5
lines changed

examples/halt.rb

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#!/usr/bin/env ruby
2+
# frozen_string_literal: true
3+
4+
require_relative 'example_helper'
5+
6+
example_description = <<DESC
7+
8+
Halting example
9+
===================
10+
11+
This example shows, how halting works in Dynflow. It spawns a single action,
12+
which in turn spawns a few evented actions and a single action which occupies
13+
the executor for a long time.
14+
15+
Once the halt event is sent, the execution plan is halted, suspended steps
16+
stay suspended forever, running steps stay running until they actually finish
17+
the current run and the execution state is flipped over to stopped state.
18+
19+
You can see the details at #{ExampleHelper::DYNFLOW_URL}
20+
21+
DESC
22+
23+
class EventedCounter < Dynflow::Action
24+
def run(event = nil)
25+
output[:counter] ||= 0
26+
output[:counter] += 1
27+
action_logger.info "Iteration #{output[:counter]}"
28+
29+
if output[:counter] < input[:count]
30+
plan_event(:tick, 5)
31+
suspend
32+
end
33+
action_logger.info "Done"
34+
end
35+
end
36+
37+
class Sleeper < Dynflow::Action
38+
def run
39+
sleep input[:time]
40+
end
41+
end
42+
43+
class Wrapper < Dynflow::Action
44+
def plan
45+
sequence do
46+
concurrence do
47+
5.times { |i| plan_action(EventedCounter, :count => i + 1) }
48+
plan_action Sleeper, :time => 20
49+
end
50+
plan_self
51+
end
52+
end
53+
54+
def run
55+
# Noop
56+
end
57+
end
58+
59+
if $0 == __FILE__
60+
puts example_description
61+
62+
ExampleHelper.world.action_logger.level = Logger::DEBUG
63+
ExampleHelper.world
64+
t = ExampleHelper.world.trigger(Wrapper)
65+
Thread.new do
66+
sleep 8
67+
ExampleHelper.world.halt(t.id)
68+
end
69+
70+
ExampleHelper.run_web_console
71+
end

lib/dynflow/director.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,15 @@ def terminate
243243
end
244244
end
245245

246+
def halt(event)
247+
manager = @execution_plan_managers[event.execution_plan_id]
248+
return unless manager
249+
250+
@logger.warn "Halting execution plan #{event.execution_plan_id}"
251+
manager.halt
252+
finish_manager manager
253+
end
254+
246255
private
247256

248257
def unless_done(manager, work_items)

lib/dynflow/director/execution_plan_manager.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ def initialize(world, execution_plan, future)
1212
@execution_plan = Type! execution_plan, ExecutionPlan
1313
@future = Type! future, Concurrent::Promises::ResolvableFuture
1414
@running_steps_manager = RunningStepsManager.new(world)
15+
@halted = false
1516

1617
unless [:planned, :paused].include? execution_plan.state
1718
raise "execution_plan is not in pending or paused state, it's #{execution_plan.state}"
@@ -24,6 +25,11 @@ def start
2425
start_run or start_finalize or finish
2526
end
2627

28+
def halt
29+
@halted = true
30+
@running_steps_manager.terminate
31+
end
32+
2733
def restart
2834
@run_manager = nil
2935
@finalize_manager = nil
@@ -71,7 +77,7 @@ def event(event)
7177
end
7278

7379
def done?
74-
(!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?)
80+
@halted || (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?)
7581
end
7682

7783
def terminate
@@ -87,6 +93,7 @@ def update_steps(steps)
8793
def compute_next_from_step(step)
8894
raise "run manager not set" unless @run_manager
8995
raise "run manager already done" if @run_manager.done?
96+
return [] if @halted
9097

9198
next_steps = @run_manager.what_is_next(step)
9299
if @run_manager.done?

lib/dynflow/director/running_steps_manager.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def initialize(world)
1515
# to handle potential updates of the step object (that is part of the event)
1616
@events = QueueHash.new(Integer, Director::Event)
1717
@events_by_request_id = {}
18+
@halted = false
1819
end
1920

2021
def terminate
@@ -26,6 +27,10 @@ def terminate
2627
end
2728
end
2829

30+
def halt
31+
@halted = true
32+
end
33+
2934
def add(step, work)
3035
Type! step, ExecutionPlan::Steps::RunStep
3136
@running_steps[step.id] = step
@@ -83,6 +88,10 @@ def event(event)
8388
event.result.reject UnprocessableEvent.new('step is not suspended, it cannot process events')
8489
return []
8590
end
91+
if @halted
92+
event.result.reject UnprocessableEvent.new('execution plan is halted, it cannot receive events')
93+
return []
94+
end
8695

8796
can_run_event = @work_items.empty?(step.id)
8897
@events_by_request_id[event.request_id] = event

lib/dynflow/dispatcher.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ module Dispatcher
2828
execution_plan_id: type { variants String, NilClass }
2929
end
3030

31-
variants Event, Execution, Ping, Status, Planning
31+
Halt = type do
32+
fields! execution_plan_id: String, optional: Algebrick::Types::Boolean
33+
end
34+
35+
variants Event, Execution, Ping, Status, Planning, Halt
3236
end
3337

3438
Response = Algebrick.type do

lib/dynflow/dispatcher/client_dispatcher.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def dispatch_request(request, client_world_id, request_id)
137137
(on ~Execution | ~Planning do |execution|
138138
AnyExecutor
139139
end),
140-
(on ~Event do |event|
140+
(on ~Event | ~Halt do |event|
141141
ignore_unknown = event.optional
142142
find_executor(event.execution_plan_id)
143143
end),
@@ -236,7 +236,7 @@ def resolve_tracked_request(id, error = nil)
236236
(on Execution.(execution_plan_id: ~any) do |uuid|
237237
@world.persistence.load_execution_plan(uuid)
238238
end),
239-
(on Event | Ping do
239+
(on Event | Ping | Halt do
240240
true
241241
end)
242242
@tracked_requests.delete(id).success! resolve_to

lib/dynflow/dispatcher/executor_dispatcher.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ def handle_request(envelope)
1212
on(Planning) { perform_planning(envelope, envelope.message)},
1313
on(Execution) { perform_execution(envelope, envelope.message) },
1414
on(Event) { perform_event(envelope, envelope.message) },
15-
on(Status) { get_execution_status(envelope, envelope.message) })
15+
on(Status) { get_execution_status(envelope, envelope.message) },
16+
on(Halt) { halt_execution_plan(envelope, envelope.message) })
1617
end
1718

1819
protected
@@ -51,6 +52,11 @@ def when_done(plan, envelope, execution, execution_lock)
5152
end
5253
end
5354

55+
def halt_execution_plan(envelope, execution_plan_id)
56+
@world.executor.halt execution_plan_id
57+
respond(envelope, Done)
58+
end
59+
5460
def perform_event(envelope, event_request)
5561
future = on_finish do |f|
5662
f.then do

lib/dynflow/executors/abstract/core.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ def handle_persistence_error(error, work = nil)
6565
end
6666
end
6767

68+
def halt(execution_plan_id)
69+
@director.halt execution_plan_id
70+
end
71+
6872
def start_termination(*args)
6973
logger.info 'shutting down Core ...'
7074
super

lib/dynflow/executors/parallel.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ def execution_status(execution_plan_id = nil)
5656
@core.ask!([:execution_status, execution_plan_id])
5757
end
5858

59+
def halt(execution_plan_id)
60+
@core.tell([:halt, execution_plan_id])
61+
end
62+
5963
def initialized
6064
@core_initialized
6165
end

lib/dynflow/world.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,10 @@ def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent
251251
publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout)
252252
end
253253

254+
def halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future)
255+
publish_request(Dispatcher::Halt[execution_plan_id], accepted, false)
256+
end
257+
254258
def publish_request(request, done, wait_for_accepted, timeout = nil)
255259
accepted = Concurrent::Promises.resolvable_future
256260
accepted.rescue do |reason|

0 commit comments

Comments
 (0)