Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-439: Add comments for integration test workflows #534

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion integ/workflow/any_command_close/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ import (
"net/http"
)

/**
* This test workflow has 2 states, using REST controller to implement the workflow directly.
*
* State1:
* - WaitUntil wait until a signal is received
* - Execute method will fire the signal and move the State2
* State2:
* - Waits on nothing. Will execute momentarily
* - Execute method will gracefully complete workflow
*/
const (
WorkflowType = "any_command_close"
State1 = "S1"
Expand Down Expand Up @@ -41,7 +51,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++

if req.GetWorkflowStateId() == State1 {
// Proceed after either signal is received
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: []iwfidl.SignalCommand{
Expand All @@ -60,6 +72,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
return
}
if req.GetWorkflowStateId() == State2 {
// Go straight to the decide methods without any commands
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(),
Expand All @@ -82,10 +95,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++

if req.GetWorkflowStateId() == State1 {
signalResults := req.GetCommandResults()
h.invokeData["signalCommandResultsLength"] = len(signalResults.SignalResults)

// Trigger signals
h.invokeData["signalChannelName0"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId0"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus0"] = signalResults.SignalResults[0].GetSignalRequestStatus()
Expand All @@ -95,6 +110,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
h.invokeData["signalStatus1"] = signalResults.SignalResults[1].GetSignalRequestStatus()
h.invokeData["signalValue1"] = signalResults.SignalResults[1].GetSignalValue()

// Move to State 2
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand All @@ -106,7 +122,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
})
return
} else if req.GetWorkflowStateId() == State2 {
// go to complete
// Move to completion
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand Down
23 changes: 21 additions & 2 deletions integ/workflow/any_command_combination/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import (
"time"
)

/**
* This test workflow has 2 states, using REST controller to implement the workflow directly.
*
* State1:
* - WaitUntil will fail its first attempt and then retry which will proceed when a combination is completed
* - Execute method will invoke the combination and move the State2
* State2:
* - WaitUntil will fail its first attempt and then retry which will proceed when a combination is completed
* - Execute method will invoke the combination and gracefully complete workflow
*/
const (
WorkflowType = "any_command_combination"
State1 = "S1"
Expand Down Expand Up @@ -88,7 +98,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++

if req.GetWorkflowStateId() == State1 {
// If the state has already retried an invalid command, proceed on combination completed
if h.hasS1RetriedForInvalidCommandId {
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
Expand All @@ -112,6 +124,8 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

c.JSON(http.StatusOK, startResp)
} else {
// If the state has not already retried an invalid command, return invalid trigger signals, which will fail
// and cause a retry
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: validSignalCommands,
Expand All @@ -124,7 +138,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
}
return
}

if req.GetWorkflowStateId() == State2 {
// If the state has already retried an invalid command, return signals and completion metrics
if h.hasS2RetriedForInvalidCommandId {
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
Expand All @@ -148,6 +164,8 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

c.JSON(http.StatusOK, startResp)
} else {
// If the state has not already retried an invalid command, return invalid trigger signals, which will fail
// and cause a retry
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: invalidSignalCommands,
Expand All @@ -174,6 +192,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++

// Trigger signals and move to State 2
if req.GetWorkflowStateId() == State1 {
h.invokeData["s1_commandResults"] = req.GetCommandResults()

Expand All @@ -188,9 +208,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
})
return
} else if req.GetWorkflowStateId() == State2 {
// Trigger data and move to completion
h.invokeData["s2_commandResults"] = req.GetCommandResults()

// go to complete
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand Down
19 changes: 18 additions & 1 deletion integ/workflow/any_timer_signal/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import (
"time"
)

/**
* This test workflow has 2 states, using REST controller to implement the workflow directly.
*
* State1:
* - WaitUntil will wait for the signals to trigger.
* - Execute method will trigger signals and retry State1 once, then trigger signals and move the State2
* State2:
* - Waits on nothing. Will execute momentarily
* - Execute method will gracefully complete workflow
*/
const (
WorkflowType = "any_timer_signal"
State1 = "S1"
Expand Down Expand Up @@ -41,9 +51,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++

if req.GetWorkflowStateId() == State1 {
var timerCommands []iwfidl.TimerCommand
context := req.GetContext()

// Fire timer after 1s on first start attempt
if context.GetStateExecutionId() == State1+"-"+"1" {
now := time.Now().Unix()
timerCommands = []iwfidl.TimerCommand{
Expand All @@ -67,7 +80,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
})
return
}

if req.GetWorkflowStateId() == State2 {
// Go straight to the decide methods without any commands
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(),
Expand Down Expand Up @@ -95,12 +110,14 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
var movements []iwfidl.StateMovement

context := req.GetContext()
// On first State 1 attempt, trigger signals and stay on the first state
if context.GetStateExecutionId() == State1+"-"+"1" {
h.invokeData["signalChannelName1"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId1"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus1"] = signalResults.SignalResults[0].GetSignalRequestStatus()
movements = []iwfidl.StateMovement{{StateId: State1}}
} else {
// After the first State 1 attempt, trigger signals and move to next state
h.invokeData["signalChannelName2"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId2"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus2"] = signalResults.SignalResults[0].GetSignalRequestStatus()
Expand All @@ -115,7 +132,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
})
return
} else if req.GetWorkflowStateId() == State2 {
// go to complete
// Move to completion
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand Down
17 changes: 14 additions & 3 deletions integ/workflow/basic/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ import (
"net/http"
)

/**
* This test workflow has 2 states, using REST controller to implement the workflow directly.
*
* State1:
* - Waits on nothing. Will execute momentarily
* - Execute method will move to State2
* State2:
* - Waits on nothing. Will execute momentarily
* - Execute method will gracefully complete workflow
*/
const (
WorkflowType = "basic"
State1 = "S1"
Expand Down Expand Up @@ -39,7 +49,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
}

if req.GetWorkflowType() == WorkflowType {
// basic workflow go straight to decide methods without any commands
// Basic workflow go straight to decide methods without any commands
if req.GetWorkflowStateId() == State1 || req.GetWorkflowStateId() == State2 {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
Expand Down Expand Up @@ -68,8 +78,9 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++

if req.GetWorkflowStateId() == State1 {
// go to S2
// Move to next state
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand Down Expand Up @@ -98,7 +109,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
})
return
} else if req.GetWorkflowStateId() == State2 {
// go to complete
// Move to completion
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand Down
23 changes: 18 additions & 5 deletions integ/workflow/conditional_close/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"time"
)

/**
* This test workflow has 1 state, using REST controller to implement the workflow directly.
*
* State1:
* - WaitUntil will proceed when the channel or signal is published to
* - Execute method will continuously retry State1 until the 3rd attempt which will send a message to the channel or
* signal, making the state empty and force-complete.
*/
const (
WorkflowType = "conditional_close"
RpcPublishInternalChannel = "publish_internal_channel"
Expand Down Expand Up @@ -45,6 +53,7 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) {
log.Println("received workflow worker rpc request, ", req)
h.invokeHistory[req.RpcName]++

// Return channel name
c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{
PublishToInterStateChannel: []iwfidl.InterStateChannelPublishing{
{
Expand All @@ -65,8 +74,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
if req.GetWorkflowStateId() == State1 {

if req.GetWorkflowStateId() == State1 {
// Proceed when channel is published to
cmdReq := &iwfidl.CommandRequest{
InterStateChannelCommands: []iwfidl.InterStateChannelCommand{
{
Expand All @@ -76,8 +86,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
CommandWaitingType: ptr.Any(iwfidl.ANY_COMPLETED),
}
input := req.GetStateInput()

if input.GetData() == "use-signal-channel" {
// use signal
// Proceed when signal is published to
cmdReq = &iwfidl.CommandRequest{
SignalCommands: []iwfidl.SignalCommand{
{
Expand All @@ -87,6 +98,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
CommandWaitingType: ptr.Any(iwfidl.ANY_COMPLETED),
}
}

c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: cmdReq,
})
Expand All @@ -112,10 +124,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
var internalChanPub []iwfidl.InterStateChannelPublishing
context := req.GetContext()
if context.GetStateExecutionId() == "S1-1" {
// wait for 3 seconds so that the channel can have a new message
// Wait for 3 seconds so that the channel can have a new message
time.Sleep(time.Second * 3)
} else if context.GetStateExecutionId() == "S1-3" {
// send internal channel message within the state execution
// Send internal channel message within the state execution
// and expecting the messages are processed by the conditional check
internalChanPub = []iwfidl.InterStateChannelPublishing{
{
Expand All @@ -130,8 +142,9 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
CloseInput: &TestInput,
}
input := req.GetStateInput()

// Use signal instead
if input.GetData() == "use-signal-channel" {
// use signal
conditionalClose = &iwfidl.WorkflowConditionalClose{
ConditionalCloseType: iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY.Ptr(),
ChannelName: iwfidl.PtrString(TestChannelName),
Expand Down
15 changes: 15 additions & 0 deletions integ/workflow/deadend/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import (
"net/http"
)

/**
* This test workflow has 3 states, using REST controller to implement the workflow directly.
*
* RPCWriteData:
* - WaitUntil will upsert data attributes
* RPCTriggerState:
* - WaitUntil will move to State1
* State1:
* - WaitUntil is skipped
* - Execute method will put the state into a dead-end.
*/
const (
WorkflowType = "deadend"
RPCTriggerState = "test-RPCTriggerState"
Expand Down Expand Up @@ -46,6 +57,7 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) {
}

if req.RpcName == RPCTriggerState {
// Move to State 1
c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{
StateDecision: &iwfidl.StateDecision{NextStates: []iwfidl.StateMovement{
{
Expand All @@ -57,6 +69,7 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) {
}},
})
} else if req.RpcName == RPCWriteData {
// Upsert data attributes
c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{
UpsertDataAttributes: []iwfidl.KeyValue{
{
Expand Down Expand Up @@ -88,6 +101,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++

// Move to the dead-end state
if req.GetWorkflowStateId() == State1 {

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
Expand Down
Loading
Loading