Skip to content

Commit 7c6a326

Browse files
committed
Minor updates
1 parent 93f0bce commit 7c6a326

File tree

2 files changed

+35
-13
lines changed

2 files changed

+35
-13
lines changed

backend/runtimestate.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ var (
1818
)
1919

2020
type OrchestrationRuntimeState struct {
21-
instanceID api.InstanceID
22-
committedEvents []*protos.HistoryEvent
23-
pendingEvents []*protos.HistoryEvent
24-
newEvents []*protos.HistoryEvent
25-
newActions []*protos.OrchestratorAction
21+
instanceID api.InstanceID
22+
oldEvents []*protos.HistoryEvent
23+
pendingEvents []*protos.HistoryEvent
24+
newEvents []*protos.HistoryEvent
25+
newActions []*protos.OrchestratorAction
2626

2727
startEvent *protos.ExecutionStartedEvent
2828
completedEvent *protos.ExecutionCompletedEvent
@@ -46,18 +46,21 @@ type OrchestratorMessage struct {
4646
// See https://github.com/microsoft/durabletask-go/issues/44 for more details.
4747
type ChunkingConfiguration struct {
4848
// MaxHistoryEventCount is the maximum number of history events that can be stored in a single chunk.
49+
// If the number of history events exceeds this value, the history events will be saved in multiple chunks that are each less than or equal to this value.
4950
// A value of 0 or less means that there is no limit.
5051
MaxHistoryEventCount int
5152
// MaxHistoryEventSizeInKB is the maximum size of a single chunk in kilobytes.
5253
// For example, a max size of 2MB would be specified as 2048.
54+
// If the aggregate size of a batch of history events exceeds this value, the history events will be saved in multiple chunks that are each less than or equal to this size.
55+
// If the size of a single history event exceeds this value, the orchestration will fail.
5356
// A value of 0 or less means that there is no limit.
5457
MaxHistoryEventSizeInKB int
5558
}
5659

5760
func NewOrchestrationRuntimeState(instanceID api.InstanceID, existingHistory []*HistoryEvent) *OrchestrationRuntimeState {
5861
s := &OrchestrationRuntimeState{
5962
instanceID: instanceID,
60-
committedEvents: make([]*HistoryEvent, 0, len(existingHistory)+10),
63+
oldEvents: make([]*HistoryEvent, 0, len(existingHistory)+10),
6164
startedTaskIDs: make(map[int32]bool),
6265
completedTaskIDs: make(map[int32]bool),
6366
}
@@ -116,15 +119,15 @@ func (s *OrchestrationRuntimeState) addEvent(e *HistoryEvent, isNew bool) error
116119
s.pendingEvents = append(s.pendingEvents, e)
117120
s.newEvents = append(s.newEvents, e)
118121
} else {
119-
s.committedEvents = append(s.committedEvents, e)
122+
s.oldEvents = append(s.oldEvents, e)
120123
}
121124

122125
s.lastUpdatedTime = e.Timestamp.AsTime()
123126
return nil
124127
}
125128

126129
func (s *OrchestrationRuntimeState) IsValid() bool {
127-
if len(s.committedEvents) == 0 && len(s.pendingEvents) == 0 {
130+
if len(s.oldEvents) == 0 && len(s.pendingEvents) == 0 && len(s.newEvents) == 0 {
128131
// empty orchestration state
129132
return true
130133
} else if s.startEvent != nil {
@@ -388,6 +391,13 @@ func (s *OrchestrationRuntimeState) SetFailed(err error) {
388391
// Clear the list of pending events since we don't care about these anymore.
389392
s.pendingEvents = nil
390393

394+
// Add a fake "execution started" event if one doesn't already exist.
395+
if s.startEvent == nil {
396+
s.newEvents = append(s.newEvents, helpers.NewExecutionStartedEvent(
397+
"(Unknown)",
398+
string(s.instanceID),
399+
))
400+
391401
// Apply an "orchestration failed" action to the current state.
392402
s.newActions = []*protos.OrchestratorAction{helpers.NewCompleteOrchestrationAction(
393403
-1,
@@ -469,7 +479,7 @@ func (s *OrchestrationRuntimeState) IsCompleted() bool {
469479
}
470480

471481
func (s *OrchestrationRuntimeState) OldEvents() []*HistoryEvent {
472-
return s.committedEvents
482+
return s.oldEvents
473483
}
474484

475485
func (s *OrchestrationRuntimeState) NewEvents() []*HistoryEvent {
@@ -496,10 +506,10 @@ func (s *OrchestrationRuntimeState) String() string {
496506

497507
func (s *OrchestrationRuntimeState) getStartedTime() time.Time {
498508
var startTime time.Time
499-
if len(s.committedEvents) > 0 {
500-
startTime = s.committedEvents[0].Timestamp.AsTime()
501-
} else if len(s.committedEvents) > 0 {
502-
startTime = s.committedEvents[0].Timestamp.AsTime()
509+
if len(s.oldEvents) > 0 {
510+
startTime = s.oldEvents[0].Timestamp.AsTime()
511+
} else if len(s.newEvents) > 0 {
512+
startTime = s.newEvents[0].Timestamp.AsTime()
503513
}
504514
return startTime
505515
}

tests/runtimestate_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,18 @@ func Test_DuplicateOutgoingEvents(t *testing.T) {
508508

509509
func Test_SetFailed(t *testing.T) {
510510
errFailure := errors.New("you got terminated")
511+
t.Run("Pending", func(t *testing.T) {
512+
s := backend.NewOrchestrationRuntimeState("abc", []*protos.HistoryEvent{})
513+
assert.NotEqual(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED, s.RuntimeStatus())
514+
s.SetFailed(errFailure)
515+
s.ProcessChanges(defaultChunkingConfig, nil, logger)
516+
require.True(t, s.IsValid())
517+
assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED, s.RuntimeStatus())
518+
failureDetails, err := s.FailureDetails()
519+
require.NoError(t, err)
520+
assert.Equal(t, errFailure.Error(), failureDetails.ErrorMessage)
521+
})
522+
511523
t.Run("Running", func(t *testing.T) {
512524
s := backend.NewOrchestrationRuntimeState("abc", []*protos.HistoryEvent{
513525
helpers.NewExecutionStartedEvent("MyOrchestration", "abc", nil, nil, nil),

0 commit comments

Comments
 (0)