Skip to content

Commit 4f15ce2

Browse files
author
jbowers
committed
IWF-274: fix prune split logic and tests to pass timer config through to interpreter
1 parent a6399df commit 4f15ce2

File tree

5 files changed

+106
-41
lines changed

5 files changed

+106
-41
lines changed

integ/timer_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,45 @@ func TestTimerWorkflowCadenceContinueAsNew(t *testing.T) {
5757
}
5858

5959
// TODO: create greedy tests by copying these 4 tests and pass in OptimizeTimer: true
60+
func TestGreedyTimerWorkflowTemporal(t *testing.T) {
61+
if !*temporalIntegTest {
62+
t.Skip()
63+
}
64+
for i := 0; i < *repeatIntegTest; i++ {
65+
doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumGreedyTimerConfig(true, false))
66+
smallWaitForFastTest()
67+
}
68+
}
69+
70+
func TestGreedyTimerWorkflowCadence(t *testing.T) {
71+
if !*cadenceIntegTest {
72+
t.Skip()
73+
}
74+
for i := 0; i < *repeatIntegTest; i++ {
75+
doTestTimerWorkflow(t, service.BackendTypeCadence, minimumGreedyTimerConfig(true, false))
76+
smallWaitForFastTest()
77+
}
78+
}
79+
80+
func TestGreedyTimerWorkflowTemporalContinueAsNew(t *testing.T) {
81+
if !*temporalIntegTest {
82+
t.Skip()
83+
}
84+
for i := 0; i < *repeatIntegTest; i++ {
85+
doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0())
86+
smallWaitForFastTest()
87+
}
88+
}
89+
90+
func TestGreedyTimerWorkflowCadenceContinueAsNew(t *testing.T) {
91+
if !*cadenceIntegTest {
92+
t.Skip()
93+
}
94+
for i := 0; i < *repeatIntegTest; i++ {
95+
doTestTimerWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0())
96+
smallWaitForFastTest()
97+
}
98+
}
6099

61100
func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
62101
// start test workflow server

integ/util.go

+13
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,19 @@ func minimumContinueAsNewConfig(optimizeActivity bool) *iwfidl.WorkflowConfig {
220220
}
221221
}
222222

223+
func minimumGreedyTimerConfig(optimizeTimer bool, continueAsNew bool) *iwfidl.WorkflowConfig {
224+
if continueAsNew {
225+
return &iwfidl.WorkflowConfig{
226+
ContinueAsNewThreshold: iwfidl.PtrInt32(1),
227+
OptimizeTimer: iwfidl.PtrBool(optimizeTimer),
228+
}
229+
}
230+
231+
return &iwfidl.WorkflowConfig{
232+
OptimizeTimer: iwfidl.PtrBool(optimizeTimer),
233+
}
234+
}
235+
223236
func minimumContinueAsNewConfigV0() *iwfidl.WorkflowConfig {
224237
return minimumContinueAsNewConfig(false)
225238
}

service/api/service.go

+3
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ func overrideWorkflowConfig(configOverride iwfidl.WorkflowConfig, workflowConfig
196196
if configOverride.OptimizeActivity != nil {
197197
workflowConfig.OptimizeActivity = configOverride.OptimizeActivity
198198
}
199+
if configOverride.OptimizeTimer != nil {
200+
workflowConfig.OptimizeTimer = configOverride.OptimizeTimer
201+
}
199202
}
200203

201204
func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(

service/interpreter/timers/greedyTimerProcessor.go

+50-41
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ import (
99
"github.com/indeedeng/iwf/service"
1010
)
1111

12-
type sortedTimers struct {
13-
status service.InternalTimerStatus
14-
// Ordered slice of all timers being awaited on
15-
timers []*service.TimerInfo
12+
type TimerManager struct {
13+
// Does not map to the timers actually created by the workflow provider
14+
PendingScheduling []*service.TimerInfo
15+
// timers created through the workflow provider that are going to fire
16+
ScheduledTimerTimes []int64
1617
}
1718

1819
type GreedyTimerProcessor struct {
19-
pendingTimers sortedTimers
20+
timerManger TimerManager
2021
stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo
2122
staleSkipTimerSignals []service.StaleSkipTimerSignal
2223
provider interfaces.WorkflowProvider
@@ -32,13 +33,13 @@ func NewGreedyTimerProcessor(
3233

3334
tp := &GreedyTimerProcessor{
3435
provider: provider,
35-
pendingTimers: sortedTimers{status: service.TimerPending},
36+
timerManger: TimerManager{},
3637
stateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{},
3738
logger: provider.GetLogger(ctx),
3839
staleSkipTimerSignals: staleSkipTimerSignals,
3940
}
4041

41-
// start some single thread that manages timers
42+
// start some single thread that manages PendingScheduling
4243
tp.createGreedyTimerScheduler(ctx, continueAsNewCounter)
4344

4445
err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) {
@@ -52,14 +53,14 @@ func NewGreedyTimerProcessor(
5253
return tp
5354
}
5455

55-
func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) {
56+
func (t *TimerManager) addTimer(toAdd *service.TimerInfo) {
5657

57-
if toAdd == nil || toAdd.Status != t.status {
58+
if toAdd == nil || toAdd.Status != service.TimerPending {
5859
panic("invalid timer added")
5960
}
6061

6162
insertIndex := 0
62-
for i, timer := range t.timers {
63+
for i, timer := range t.PendingScheduling {
6364
if toAdd.FiringUnixTimestampSeconds >= timer.FiringUnixTimestampSeconds {
6465
// don't want dupes. Makes remove simpler
6566
if toAdd == timer {
@@ -70,73 +71,79 @@ func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) {
7071
}
7172
insertIndex = i + 1
7273
}
73-
t.timers = append(
74-
t.timers[:insertIndex],
75-
append([]*service.TimerInfo{toAdd}, t.timers[insertIndex:]...)...)
74+
t.PendingScheduling = append(
75+
t.PendingScheduling[:insertIndex],
76+
append([]*service.TimerInfo{toAdd}, t.PendingScheduling[insertIndex:]...)...)
7677
}
7778

78-
func (t *sortedTimers) removeTimer(toRemove *service.TimerInfo) {
79-
for i, timer := range t.timers {
79+
func (t *TimerManager) removeTimer(toRemove *service.TimerInfo) {
80+
for i, timer := range t.PendingScheduling {
8081
if toRemove == timer {
81-
t.timers = append(t.timers[:i], t.timers[i+1:]...)
82+
t.PendingScheduling = append(t.PendingScheduling[:i], t.PendingScheduling[i+1:]...)
8283
return
8384
}
8485
}
8586
}
8687

87-
func (t *sortedTimers) pruneToNextTimer(pruneTo int64) *service.TimerInfo {
88+
func (t *TimerManager) pruneToNextTimer(pruneTo int64) *service.TimerInfo {
8889

89-
if len(t.timers) == 0 {
90+
if len(t.PendingScheduling) == 0 {
9091
return nil
9192
}
9293

93-
index := len(t.timers)
94+
index := len(t.PendingScheduling)
9495

95-
for i := len(t.timers) - 1; i >= 0; i-- {
96-
timer := t.timers[i]
97-
if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == t.status {
96+
for i := len(t.PendingScheduling) - 1; i >= 0; i-- {
97+
timer := t.PendingScheduling[i]
98+
if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == service.TimerPending {
9899
break
99100
}
100101
index = i
101102
}
102-
t.timers = t.timers[:index]
103-
return t.timers[index-1]
103+
104+
// If index is 0, it means all timers are pruned
105+
if index == 0 {
106+
t.PendingScheduling = nil
107+
return nil
108+
}
109+
110+
prunedTimer := t.PendingScheduling[index-1]
111+
t.PendingScheduling = t.PendingScheduling[:index]
112+
return prunedTimer
104113
}
105114

106115
func (t *GreedyTimerProcessor) createGreedyTimerScheduler(
107116
ctx interfaces.UnifiedContext,
108117
continueAsNewCounter *cont.ContinueAsNewCounter) {
109118

110119
t.provider.GoNamed(ctx, "greedy-timer-scheduler", func(ctx interfaces.UnifiedContext) {
111-
// NOTE: next timer to fire is at the end of the slice
112-
var createdTimers []int64
113120
for {
114121
t.provider.Await(ctx, func() bool {
115-
// remove fired timers
122+
// remove fired PendingScheduling
116123
now := t.provider.Now(ctx).Unix()
117-
for i := len(createdTimers) - 1; i >= 0; i-- {
118-
if createdTimers[i] > now {
119-
createdTimers = createdTimers[:i+1]
124+
for i := len(t.timerManger.ScheduledTimerTimes) - 1; i >= 0; i-- {
125+
if t.timerManger.ScheduledTimerTimes[i] > now {
126+
t.timerManger.ScheduledTimerTimes = t.timerManger.ScheduledTimerTimes[:i+1]
120127
break
121128
}
122129
}
123-
next := t.pendingTimers.pruneToNextTimer(now)
124-
return (next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1])) || continueAsNewCounter.IsThresholdMet()
130+
next := t.timerManger.pruneToNextTimer(now)
131+
return (next != nil && (len(t.timerManger.ScheduledTimerTimes) == 0 || next.FiringUnixTimestampSeconds < t.timerManger.ScheduledTimerTimes[len(t.timerManger.ScheduledTimerTimes)-1])) || continueAsNewCounter.IsThresholdMet()
125132
})
126133

127134
if continueAsNewCounter.IsThresholdMet() {
128135
break
129136
}
130137

131138
now := t.provider.Now(ctx).Unix()
132-
next := t.pendingTimers.pruneToNextTimer(now)
133-
//next := t.pendingTimers.getEarliestTimer()
139+
next := t.timerManger.pruneToNextTimer(now)
140+
//next := t.timerManger.getEarliestTimer()
134141
// only create a new timer when a pending timer exists before the next existing timer fires
135-
if next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]) {
142+
if next != nil && (len(t.timerManger.ScheduledTimerTimes) == 0 || next.FiringUnixTimestampSeconds < t.timerManger.ScheduledTimerTimes[len(t.timerManger.ScheduledTimerTimes)-1]) {
136143
fireAt := next.FiringUnixTimestampSeconds
137144
duration := time.Duration(fireAt-now) * time.Second
138145
t.provider.NewTimer(ctx, duration)
139-
createdTimers = append(createdTimers, fireAt)
146+
t.timerManger.ScheduledTimerTimes = append(t.timerManger.ScheduledTimerTimes, fireAt)
140147
}
141148
}
142149
})
@@ -216,23 +223,23 @@ func (t *GreedyTimerProcessor) WaitForTimerFiredOrSkipped(
216223
return service.TimerSkipped
217224
}
218225

219-
if timer.FiringUnixTimestampSeconds >= t.provider.Now(ctx).Unix() {
226+
if timer.FiringUnixTimestampSeconds <= t.provider.Now(ctx).Unix() {
220227
timer.Status = service.TimerFired
221228
return service.TimerFired
222229
}
223230

224231
// otherwise *cancelWaiting should return false to indicate that this timer isn't completed(fired or skipped)
225-
t.pendingTimers.removeTimer(timer)
232+
t.timerManger.removeTimer(timer)
226233
return service.TimerPending
227234
}
228235

229-
// RemovePendingTimersOfState is for when a state is completed, remove all its pending timers
236+
// RemovePendingTimersOfState is for when a state is completed, remove all its pending PendingScheduling
230237
func (t *GreedyTimerProcessor) RemovePendingTimersOfState(stateExeId string) {
231238

232239
timers := t.stateExecutionCurrentTimerInfos[stateExeId]
233240

234241
for _, timer := range timers {
235-
t.pendingTimers.removeTimer(timer)
242+
t.timerManger.removeTimer(timer)
236243
}
237244

238245
delete(t.stateExecutionCurrentTimerInfos, stateExeId)
@@ -256,7 +263,9 @@ func (t *GreedyTimerProcessor) AddTimers(
256263
FiringUnixTimestampSeconds: cmd.GetFiringUnixTimestampSeconds(),
257264
Status: service.TimerPending,
258265
}
259-
t.pendingTimers.addTimer(&timer)
266+
}
267+
if timer.Status == service.TimerPending {
268+
t.timerManger.addTimer(&timer)
260269
}
261270
timers[idx] = &timer
262271
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package timers

0 commit comments

Comments
 (0)