Skip to content

Commit

Permalink
Add API for injecting 'repetitions' into saga executor (#88)
Browse files Browse the repository at this point in the history
Provides a simple API for instructing a node to "execute twice".

This provides a "bare-minimum" helper API for testing idempotency within a saga. When combined with #67 - which was used to test unwind safety - it should be possible to test that all actions / undo actions within a saga are idempotent, at least across being called twice.

Part of #31
  • Loading branch information
smklein authored Dec 28, 2022
1 parent f38cee9 commit 53129dd
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 46 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

https://github.com/oxidecomputer/steno/compare/v0.3.0\...HEAD[Full list of commits]

* https://github.com/oxidecomputer/steno/pull/88[#88] Add `SecClient::saga_inject_repeat` method to help with testing idempotency

== 0.3.0 (released 2022-11-02)

https://github.com/oxidecomputer/steno/compare/v0.2.0\...v0.3.0[Full list of commits]
Expand Down
80 changes: 74 additions & 6 deletions src/saga_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::saga_action_generic::ActionData;
use crate::saga_action_generic::ActionInjectError;
use crate::saga_log::SagaNodeEventType;
use crate::saga_log::SagaNodeLoadStatus;
use crate::sec::RepeatInjected;
use crate::sec::SecExecClient;
use crate::ActionRegistry;
use crate::SagaCachedState;
Expand Down Expand Up @@ -287,6 +288,10 @@ struct TaskParams<UserType: SagaType> {
saga_params: Arc<serde_json::Value>,
/// The action itself that we're executing.
action: Arc<dyn Action<UserType>>,
/// If true, indicates that the action should be executed multiple
/// times, and the latter result should be used. This is useful
/// when testing idempotency of a user-specified action.
injected_repeat: Option<RepeatInjected>,
}

/// Executes a saga
Expand Down Expand Up @@ -399,6 +404,7 @@ impl<UserType: SagaType> SagaExecutor<UserType> {
node_errors: BTreeMap::new(),
sglog,
injected_errors: BTreeSet::new(),
injected_repeats: BTreeMap::new(),
sec_hdl,
saga_id,
};
Expand Down Expand Up @@ -839,6 +845,21 @@ impl<UserType: SagaType> SagaExecutor<UserType> {
live_state.injected_errors.insert(node_id);
}

/// Forces a given node to be executed twice
///
/// When execution reaches this node, the action and undo actions
/// are invoked twice by the saga executor.
///
/// If this node produces output, only the second value is stored.
pub async fn inject_repeat(
&self,
node_id: NodeIndex,
repeat: RepeatInjected,
) {
let mut live_state = self.live_state.lock().await;
live_state.injected_repeats.insert(node_id, repeat);
}

/// Runs the saga
///
/// This might be running a saga that has never been started before or
Expand Down Expand Up @@ -964,6 +985,10 @@ impl<UserType: SagaType> SagaExecutor<UserType> {
saga_params,
action: sgaction,
user_context: Arc::clone(&self.user_context),
injected_repeat: live_state
.injected_repeats
.get(&node_id)
.map(|r| *r),
};

let task = tokio::spawn(SagaExecutor::exec_node(task_params));
Expand Down Expand Up @@ -1001,6 +1026,10 @@ impl<UserType: SagaType> SagaExecutor<UserType> {
saga_params,
action: sgaction,
user_context: Arc::clone(&self.user_context),
injected_repeat: live_state
.injected_repeats
.get(&node_id)
.map(|r| *r),
};

let task = tokio::spawn(SagaExecutor::undo_node(task_params));
Expand Down Expand Up @@ -1087,14 +1116,22 @@ impl<UserType: SagaType> SagaExecutor<UserType> {
}
}

let exec_future = task_params.action.do_it(ActionContext {
let make_action_context = || ActionContext {
ancestor_tree: Arc::clone(&task_params.ancestor_tree),
saga_params: Arc::clone(&task_params.saga_params),
node_id,
dag: Arc::clone(&task_params.dag),
user_context: Arc::clone(&task_params.user_context),
});
let result = exec_future.await;
};

let mut result = task_params.action.do_it(make_action_context()).await;

if let Some(repeat) = task_params.injected_repeat {
for _ in 0..repeat.action.get() - 1 {
result = task_params.action.do_it(make_action_context()).await;
}
}

let node: Box<dyn SagaNodeRest<UserType>> = match result {
Ok(output) => {
Box::new(SagaNode { node_id, state: SgnsDone(output) })
Expand Down Expand Up @@ -1137,16 +1174,26 @@ impl<UserType: SagaType> SagaExecutor<UserType> {
}
}

let exec_future = task_params.action.undo_it(ActionContext {
let make_action_context = || ActionContext {
ancestor_tree: Arc::clone(&task_params.ancestor_tree),
saga_params: Arc::clone(&task_params.saga_params),
node_id,
dag: Arc::clone(&task_params.dag),
user_context: Arc::clone(&task_params.user_context),
});
};

// TODO-robustness We have to figure out what it means to fail here and
// what we want to do about it.
exec_future.await.unwrap();
task_params.action.undo_it(make_action_context()).await.unwrap();
if let Some(repeat) = task_params.injected_repeat {
for _ in 0..repeat.undo.get() - 1 {
task_params
.action
.undo_it(make_action_context())
.await
.unwrap();
}
}
let node = Box::new(SagaNode {
node_id,
state: SgnsUndone(UndoMode::ActionUndone),
Expand Down Expand Up @@ -1333,6 +1380,9 @@ struct SagaExecLiveState {

/// Injected errors
injected_errors: BTreeSet<NodeIndex>,

/// Injected actions which should be called repeatedly
injected_repeats: BTreeMap<NodeIndex, RepeatInjected>,
}

#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
Expand Down Expand Up @@ -1998,6 +2048,16 @@ pub trait SagaExecManager: fmt::Debug + Send + Sync {
///
/// See [`Dag::get_index()`] to get the node_id for a node.
fn inject_error(&self, node_id: NodeIndex) -> BoxFuture<'_, ()>;

/// Replaces the action at the specified node with one that calls both the
/// action (and undo action, if called) twice.
///
/// See [`Dag::get_index()`] to get the node_id for a node.
fn inject_repeat(
&self,
node_id: NodeIndex,
repeat: RepeatInjected,
) -> BoxFuture<'_, ()>;
}

impl<T> SagaExecManager for SagaExecutor<T>
Expand All @@ -2019,6 +2079,14 @@ where
fn inject_error(&self, node_id: NodeIndex) -> BoxFuture<'_, ()> {
self.inject_error(node_id).boxed()
}

fn inject_repeat(
&self,
node_id: NodeIndex,
repeat: RepeatInjected,
) -> BoxFuture<'_, ()> {
self.inject_repeat(node_id, repeat).boxed()
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 53129dd

Please sign in to comment.