Skip to content

Commit a6399df

Browse files
author
jbowers
committed
IWF-274: continue as new should complete timer spawning goroutine
1 parent 3a6b2d5 commit a6399df

8 files changed

+67
-30
lines changed

service/interpreter/workflowConfiger.go service/interpreter/config/workflowConfiger.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package interpreter
1+
package config
22

33
import (
44
"github.com/indeedeng/iwf/gen/iwfidl"

service/interpreter/continueAsNewCounter.go service/interpreter/cont/continueAsNewCounter.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
1-
package interpreter
1+
package cont
22

3-
import "github.com/indeedeng/iwf/service/interpreter/interfaces"
3+
import (
4+
"github.com/indeedeng/iwf/service/interpreter/config"
5+
"github.com/indeedeng/iwf/service/interpreter/interfaces"
6+
)
47

58
type ContinueAsNewCounter struct {
69
executedStateApis int32
710
signalsReceived int32
811
syncUpdateReceived int32
912
triggeredByAPI bool
1013

11-
configer *WorkflowConfiger
14+
configer *config.WorkflowConfiger
1215
rootCtx interfaces.UnifiedContext
1316
provider interfaces.WorkflowProvider
1417
}
1518

1619
func NewContinueAsCounter(
17-
configer *WorkflowConfiger, rootCtx interfaces.UnifiedContext, provider interfaces.WorkflowProvider,
20+
configer *config.WorkflowConfiger, rootCtx interfaces.UnifiedContext, provider interfaces.WorkflowProvider,
1821
) *ContinueAsNewCounter {
1922
return &ContinueAsNewCounter{
2023
configer: configer,

service/interpreter/queryHandler.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package interpreter
33
import (
44
"github.com/indeedeng/iwf/gen/iwfidl"
55
"github.com/indeedeng/iwf/service"
6+
"github.com/indeedeng/iwf/service/interpreter/config"
67
"github.com/indeedeng/iwf/service/interpreter/interfaces"
78
)
89

910
func SetQueryHandlers(
1011
ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager,
1112
internalChannel *InternalChannel, signalReceiver *SignalReceiver,
1213
continueAsNewer *ContinueAsNewer,
13-
workflowConfiger *WorkflowConfiger, basicInfo service.BasicInfo,
14+
workflowConfiger *config.WorkflowConfiger, basicInfo service.BasicInfo,
1415
) error {
1516
err := provider.SetQueryHandler(ctx, service.GetDataAttributesWorkflowQueryType, func(req service.GetDataAttributesQueryRequest) (service.GetDataAttributesQueryResponse, error) {
1617
dos := persistenceManager.GetDataObjectsByKey(req)

service/interpreter/signalReceiver.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package interpreter
22

33
import (
44
"github.com/indeedeng/iwf/service/common/ptr"
5+
"github.com/indeedeng/iwf/service/interpreter/config"
6+
"github.com/indeedeng/iwf/service/interpreter/cont"
57
"github.com/indeedeng/iwf/service/interpreter/interfaces"
68
"strings"
79

@@ -16,7 +18,7 @@ type SignalReceiver struct {
1618
reasonFailWorkflowByClient *string
1719
provider interfaces.WorkflowProvider
1820
timerProcessor interfaces.TimerProcessor
19-
workflowConfiger *WorkflowConfiger
21+
workflowConfiger *config.WorkflowConfiger
2022
interStateChannel *InternalChannel
2123
stateRequestQueue *StateRequestQueue
2224
persistenceManager *PersistenceManager
@@ -25,8 +27,8 @@ type SignalReceiver struct {
2527
func NewSignalReceiver(
2628
ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, interStateChannel *InternalChannel,
2729
stateRequestQueue *StateRequestQueue,
28-
persistenceManager *PersistenceManager, tp interfaces.TimerProcessor, continueAsNewCounter *ContinueAsNewCounter,
29-
workflowConfiger *WorkflowConfiger,
30+
persistenceManager *PersistenceManager, tp interfaces.TimerProcessor, continueAsNewCounter *cont.ContinueAsNewCounter,
31+
workflowConfiger *config.WorkflowConfiger,
3032
initReceivedSignals map[string][]*iwfidl.EncodedObject,
3133
) *SignalReceiver {
3234
if initReceivedSignals == nil {

service/interpreter/stateExecutionCounter.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"github.com/indeedeng/iwf/service"
77
"github.com/indeedeng/iwf/service/common/compatibility"
88
"github.com/indeedeng/iwf/service/common/ptr"
9+
"github.com/indeedeng/iwf/service/interpreter/config"
10+
"github.com/indeedeng/iwf/service/interpreter/cont"
911
"github.com/indeedeng/iwf/service/interpreter/interfaces"
1012
"reflect"
1113
"slices"
@@ -14,9 +16,9 @@ import (
1416
type StateExecutionCounter struct {
1517
ctx interfaces.UnifiedContext
1618
provider interfaces.WorkflowProvider
17-
configer *WorkflowConfiger
19+
configer *config.WorkflowConfiger
1820
globalVersioner *GlobalVersioner
19-
continueAsNewCounter *ContinueAsNewCounter
21+
continueAsNewCounter *cont.ContinueAsNewCounter
2022

2123
stateIdCompletedCounts map[string]int
2224
stateIdStartedCounts map[string]int // For creating stateExecutionId: count the stateId for how many times that have been executed
@@ -26,7 +28,7 @@ type StateExecutionCounter struct {
2628

2729
func NewStateExecutionCounter(
2830
ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner,
29-
configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter,
31+
configer *config.WorkflowConfiger, continueAsNewCounter *cont.ContinueAsNewCounter,
3032
) *StateExecutionCounter {
3133
return &StateExecutionCounter{
3234
ctx: ctx,
@@ -44,7 +46,7 @@ func RebuildStateExecutionCounter(
4446
ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner,
4547
stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int,
4648
totalCurrentlyExecutingCount int,
47-
configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter,
49+
configer *config.WorkflowConfiger, continueAsNewCounter *cont.ContinueAsNewCounter,
4850
) *StateExecutionCounter {
4951
return &StateExecutionCounter{
5052
ctx: ctx,

service/interpreter/timers/greedyTimerProcessor.go

+30-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package timers
22

33
import (
4+
"github.com/indeedeng/iwf/service/interpreter/cont"
45
"github.com/indeedeng/iwf/service/interpreter/interfaces"
56
"time"
67

@@ -23,7 +24,10 @@ type GreedyTimerProcessor struct {
2324
}
2425

2526
func NewGreedyTimerProcessor(
26-
ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal,
27+
ctx interfaces.UnifiedContext,
28+
provider interfaces.WorkflowProvider,
29+
continueAsNewCounter *cont.ContinueAsNewCounter,
30+
staleSkipTimerSignals []service.StaleSkipTimerSignal,
2731
) *GreedyTimerProcessor {
2832

2933
tp := &GreedyTimerProcessor{
@@ -35,7 +39,7 @@ func NewGreedyTimerProcessor(
3539
}
3640

3741
// start some single thread that manages timers
38-
tp.createGreedyTimerScheduler(ctx)
42+
tp.createGreedyTimerScheduler(ctx, continueAsNewCounter)
3943

4044
err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) {
4145
return service.GetCurrentTimerInfosQueryResponse{
@@ -57,6 +61,10 @@ func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) {
5761
insertIndex := 0
5862
for i, timer := range t.timers {
5963
if toAdd.FiringUnixTimestampSeconds >= timer.FiringUnixTimestampSeconds {
64+
// don't want dupes. Makes remove simpler
65+
if toAdd == timer {
66+
return
67+
}
6068
insertIndex = i
6169
break
6270
}
@@ -67,6 +75,15 @@ func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) {
6775
append([]*service.TimerInfo{toAdd}, t.timers[insertIndex:]...)...)
6876
}
6977

78+
func (t *sortedTimers) removeTimer(toRemove *service.TimerInfo) {
79+
for i, timer := range t.timers {
80+
if toRemove == timer {
81+
t.timers = append(t.timers[:i], t.timers[i+1:]...)
82+
return
83+
}
84+
}
85+
}
86+
7087
func (t *sortedTimers) pruneToNextTimer(pruneTo int64) *service.TimerInfo {
7188

7289
if len(t.timers) == 0 {
@@ -86,7 +103,9 @@ func (t *sortedTimers) pruneToNextTimer(pruneTo int64) *service.TimerInfo {
86103
return t.timers[index-1]
87104
}
88105

89-
func (t *GreedyTimerProcessor) createGreedyTimerScheduler(ctx interfaces.UnifiedContext) {
106+
func (t *GreedyTimerProcessor) createGreedyTimerScheduler(
107+
ctx interfaces.UnifiedContext,
108+
continueAsNewCounter *cont.ContinueAsNewCounter) {
90109

91110
t.provider.GoNamed(ctx, "greedy-timer-scheduler", func(ctx interfaces.UnifiedContext) {
92111
// NOTE: next timer to fire is at the end of the slice
@@ -102,9 +121,13 @@ func (t *GreedyTimerProcessor) createGreedyTimerScheduler(ctx interfaces.Unified
102121
}
103122
}
104123
next := t.pendingTimers.pruneToNextTimer(now)
105-
return next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1])
124+
return (next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1])) || continueAsNewCounter.IsThresholdMet()
106125
})
107126

127+
if continueAsNewCounter.IsThresholdMet() {
128+
break
129+
}
130+
108131
now := t.provider.Now(ctx).Unix()
109132
next := t.pendingTimers.pruneToNextTimer(now)
110133
//next := t.pendingTimers.getEarliestTimer()
@@ -194,10 +217,12 @@ func (t *GreedyTimerProcessor) WaitForTimerFiredOrSkipped(
194217
}
195218

196219
if timer.FiringUnixTimestampSeconds >= t.provider.Now(ctx).Unix() {
220+
timer.Status = service.TimerFired
197221
return service.TimerFired
198222
}
199223

200224
// otherwise *cancelWaiting should return false to indicate that this timer isn't completed(fired or skipped)
225+
t.pendingTimers.removeTimer(timer)
201226
return service.TimerPending
202227
}
203228

@@ -207,7 +232,7 @@ func (t *GreedyTimerProcessor) RemovePendingTimersOfState(stateExeId string) {
207232
timers := t.stateExecutionCurrentTimerInfos[stateExeId]
208233

209234
for _, timer := range timers {
210-
timer.Status = service.TimerSkipped
235+
t.pendingTimers.removeTimer(timer)
211236
}
212237

213238
delete(t.stateExecutionCurrentTimerInfos, stateExeId)

service/interpreter/workflowImpl.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"github.com/indeedeng/iwf/service/common/event"
88
"github.com/indeedeng/iwf/service/common/ptr"
99
"github.com/indeedeng/iwf/service/common/utils"
10+
"github.com/indeedeng/iwf/service/interpreter/config"
11+
"github.com/indeedeng/iwf/service/interpreter/cont"
1012
"github.com/indeedeng/iwf/service/interpreter/env"
1113
"github.com/indeedeng/iwf/service/interpreter/interfaces"
1214
"github.com/indeedeng/iwf/service/interpreter/timers"
@@ -72,7 +74,7 @@ func InterpreterImpl(
7274
}
7375
}
7476

75-
workflowConfiger := NewWorkflowConfiger(input.Config)
77+
workflowConfiger := config.NewWorkflowConfiger(input.Config)
7678
basicInfo := service.BasicInfo{
7779
IwfWorkflowType: input.IwfWorkflowType,
7880
IwfWorkerUrl: input.IwfWorkerUrl,
@@ -82,7 +84,7 @@ func InterpreterImpl(
8284
var stateRequestQueue *StateRequestQueue
8385
var persistenceManager *PersistenceManager
8486
var timerProcessor interfaces.TimerProcessor
85-
var continueAsNewCounter *ContinueAsNewCounter
87+
var continueAsNewCounter *cont.ContinueAsNewCounter
8688
var signalReceiver *SignalReceiver
8789
var stateExecutionCounter *StateExecutionCounter
8890
var outputCollector *OutputCollector
@@ -101,12 +103,12 @@ func InterpreterImpl(
101103
internalChannel = RebuildInternalChannel(previous.InterStateChannelReceived)
102104
stateRequestQueue = NewStateRequestQueueWithResumeRequests(previous.StatesToStartFromBeginning, previous.StateExecutionsToResume)
103105
persistenceManager = RebuildPersistenceManager(provider, previous.DataObjects, previous.SearchAttributes, input.UseMemoForDataAttributes)
106+
continueAsNewCounter = cont.NewContinueAsCounter(workflowConfiger, ctx, provider)
104107
if input.Config.GetOptimizeTimer() {
105-
timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals)
108+
timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, continueAsNewCounter, previous.StaleSkipTimerSignals)
106109
} else {
107110
timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals)
108111
}
109-
continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider)
110112
signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, previous.SignalsReceived)
111113
counterInfo := previous.StateExecutionCounterInfo
112114
stateExecutionCounter = RebuildStateExecutionCounter(ctx, provider, globalVersioner,
@@ -118,12 +120,12 @@ func InterpreterImpl(
118120
internalChannel = NewInternalChannel()
119121
stateRequestQueue = NewStateRequestQueue()
120122
persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes)
123+
continueAsNewCounter = cont.NewContinueAsCounter(workflowConfiger, ctx, provider)
121124
if input.Config.GetOptimizeTimer() {
122-
timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, nil)
125+
timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, continueAsNewCounter, nil)
123126
} else {
124127
timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, nil)
125128
}
126-
continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider)
127129
signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil)
128130
stateExecutionCounter = NewStateExecutionCounter(ctx, provider, globalVersioner, workflowConfiger, continueAsNewCounter)
129131
outputCollector = NewOutputCollector(nil)
@@ -525,8 +527,8 @@ func processStateExecution(
525527
signalReceiver *SignalReceiver,
526528
timerProcessor interfaces.TimerProcessor,
527529
continueAsNewer *ContinueAsNewer,
528-
continueAsNewCounter *ContinueAsNewCounter,
529-
configer *WorkflowConfiger,
530+
continueAsNewCounter *cont.ContinueAsNewCounter,
531+
configer *config.WorkflowConfiger,
530532
shouldSendSignalOnCompletion bool,
531533
) (*iwfidl.StateDecision, service.StateExecutionStatus, error) {
532534
waitUntilApi := StateStart
@@ -832,7 +834,7 @@ func invokeStateExecute(
832834
executionContext iwfidl.Context,
833835
commandRes *iwfidl.CommandResults,
834836
continueAsNewer *ContinueAsNewer,
835-
configer *WorkflowConfiger,
837+
configer *config.WorkflowConfiger,
836838
executeApi interface{},
837839
stateExecutionLocal []iwfidl.KeyValue,
838840
shouldSendSignalOnCompletion bool,

service/interpreter/workflowUpdater.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"github.com/indeedeng/iwf/gen/iwfidl"
55
"github.com/indeedeng/iwf/service"
66
"github.com/indeedeng/iwf/service/common/event"
7+
"github.com/indeedeng/iwf/service/interpreter/config"
8+
"github.com/indeedeng/iwf/service/interpreter/cont"
79
"github.com/indeedeng/iwf/service/interpreter/interfaces"
810
"time"
911
)
@@ -12,11 +14,11 @@ type WorkflowUpdater struct {
1214
persistenceManager *PersistenceManager
1315
provider interfaces.WorkflowProvider
1416
continueAsNewer *ContinueAsNewer
15-
continueAsNewCounter *ContinueAsNewCounter
17+
continueAsNewCounter *cont.ContinueAsNewCounter
1618
internalChannel *InternalChannel
1719
signalReceiver *SignalReceiver
1820
stateRequestQueue *StateRequestQueue
19-
configer *WorkflowConfiger
21+
configer *config.WorkflowConfiger
2022
logger interfaces.UnifiedLogger
2123
basicInfo service.BasicInfo
2224
globalVersioner *GlobalVersioner
@@ -25,7 +27,7 @@ type WorkflowUpdater struct {
2527
func NewWorkflowUpdater(
2628
ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager,
2729
stateRequestQueue *StateRequestQueue,
28-
continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger,
30+
continueAsNewer *ContinueAsNewer, continueAsNewCounter *cont.ContinueAsNewCounter, configer *config.WorkflowConfiger,
2931
internalChannel *InternalChannel, signalReceiver *SignalReceiver, basicInfo service.BasicInfo,
3032
globalVersioner *GlobalVersioner,
3133
) (*WorkflowUpdater, error) {

0 commit comments

Comments
 (0)