-
-
Notifications
You must be signed in to change notification settings - Fork 747
Orquesta Implementation Patterns
This page contains a few common implementation patterns for Orquesta workflows.
Occasionally we would like to iterate over objects in an array and perform different actions on each object depending on an object's attribute.
We have a list of dictionaries that each represent a single host. Host dictionaries have name
and known operating system (os
) keys.
hosts
JSON representation:
[{
"name": "serverA",
"os": "windows",
}, {
"name": "serverB",
"os": "linux",
}]
We would like to process each item in the hosts
list using different functions, depending on the operating system installed on each host. In Python pseudocode, that would look like this:
for host in hosts:
if host["os"] == "windows":
run_cmd_on_windows(host["name"])
elif host["os"] == "linux":
run_cmd_on_linux(host["name"])
The version of Orquesta included in StackStorm v3.2 and later fixes a few bugs regarding executing tasks in parallel and join
tasks. That means we can execute the with.items
tasks in parallel:
input:
- hosts
tasks:
# This task isn't technically needed, but makes the parallel
# execution of the run_cmd tasks more apparent
init:
action: core.noop
next:
- do:
# Run both actions in parallel
- run_cmd_on_windows_hosts
- run_cmd_on_linux_hosts
run_cmd_on_windows_hosts:
with:
items: '<% ctx(hosts).filter($.os = "windows") %>'
action: ...
input:
host: <% item().name %>
next:
- do: wait_for_all_cmds
run_cmd_on_linux_hosts:
with:
items: '<% ctx(hosts).filter($.os = "linux") %>'
action: ...
input:
host: <% item().name %>
next:
- do: wait_for_all_cmds
wait_for_all_cmds:
join: all # wait for both inbound parallel tasks to finish
...
There is a bug in the version of Orquesta released in StackStorm version 3.1.0 and earlier. Executing the with.items
tasks in series works around this issue:
input:
- hosts
tasks:
# This task isn't technically needed, but makes the parallel
# execution of the run_cmd tasks more apparent to the next
# person who has to modify this workflow
init:
action: core.noop
next:
# Run both actions in series - ST2 v3.1
- do: run_cmd_on_windows_hosts
run_cmd_on_windows_hosts:
with:
items: '<% ctx(hosts).filter($.os = "windows") %>'
action: ...
input:
host: <% item().name %>
next:
- do: run_cmd_on_linux_hosts
run_cmd_on_linux_hosts:
with:
items: '<% ctx(hosts).filter($.os = "linux") %>'
action: ...
input:
host: <% item().name %>
next:
- do: ...
(Click to expand/unexpand) This Orquesta loop executes tasks as the control flow to explicitly determine which tasks to execute.
Because each control flow task is scheduled and executed by Orquesta, the workflow overall will finish slower than the previous versions, but this version also requires users to explicitly specify error handling tasks:
input:
- hosts
vars:
- loop_iteration_count: 0
tasks:
# Necessary so Orquesta knows where to start
init:
action: core.noop
next:
- do: start_loop
start_loop:
action: core.noop
next:
- when: <% ctx().loop_iteration_count < ctx().hosts.len() %>
do: switch_on_host_os
- when: <% ctx().loop_iteration_count >= ctx().hosts.len() %>
do: end_loop
switch_on_host_os:
action: core.noop
next:
- when: <% ctx().hosts[ctx().loop_iteration_count]?.os = "windows" %>
do: run_cmd_on_windows_host
- when: <% ctx().hosts[ctx().loop_iteration_count]?.os = "linux" %>
do: run_cmd_on_linux_host
- when: <% ctx().hosts[ctx().loop_iteration_count]?.os = null %>
do: ... # How to handle non-Windows and non-Linux hosts is application-specific
run_cmd_on_windows_host:
action: ...
input:
host: <% ctx().hosts[ctx().loop_iteration_count].name %>
next:
- when: <% succeeded() and ctx().loop_iteration_count < ctx().hosts.len() %>
publish:
- loop_iteration_count: <% ctx().loop_iteration_count + 1 %>
do: start_loop
- when: <% failed() %>
do: ... # How to handle per-item failures is application-specific
run_cmd_on_linux_host:
action: ...
input:
host: <% ctx().hosts[ctx().loop_iteration_count].name %>
next:
- when: <% succeeded() and ctx().loop_iteration_count < ctx().hosts.len() %>
publish:
- loop_iteration_count: <% ctx().loop_iteration_count + 1 %>
do: start_loop
- when: <% failed() %>
do: ... # How to handle per-item failures is application-specific
end_loop:
action: core.noop
next:
- do: ...
Keep in mind that the explicit loop version is a lot more code to write, troubleshoot, and maintain. However, it is easier to extend and modify if the control flow logic gets any more complex.
Sometimes we would like to update a single key in a dictionary and publish it to the workflow context. Since we can't update a single key when publishing variables in Orquesta, we need to use YAQL to create a new dictionary, use YAQL's dict.set()
function to update the key in the new dictionary to the new value, and re-publish the new dictionary to the workflow context under the same variable name.
-
my_dictionary
- a dictionary in the workflow context containing multiple keys, including thechangeme
key
tasks:
...
task_name:
action: ...
next :
- when: ...
publish:
- my_dictionary : <% ctx().my_dictionary.set("changeme", "New value") %>
do: ...