Skip to content

Commit

Permalink
Handle system time move backward case in timer task processing
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Dec 21, 2024
1 parent 8baf2e4 commit c3f87c1
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 146 deletions.
7 changes: 6 additions & 1 deletion service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/timer"
"go.temporal.io/server/common/util"
hshard "go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
)
Expand Down Expand Up @@ -303,10 +304,14 @@ func (p *scheduledQueue) lookAheadTask() {
// IsTimeExpired checks if the testing time is equal or before
// the reference time. The precision of the comparison is millisecond.
func IsTimeExpired(
task tasks.Task,
referenceTime time.Time,
testingTime time.Time,
) bool {
referenceTime = referenceTime.Truncate(persistence.ScheduledTaskMinPrecision)
// NOTE: Persistence layer may lose precision when persisting the task, which essentially moves
// task fire time backward. But we are already performing truncation here, so doesn't need to
// account for that.
referenceTime = util.MaxTime(referenceTime, task.GetKey().FireTime).Truncate(persistence.ScheduledTaskMinPrecision)
testingTime = testingTime.Truncate(persistence.ScheduledTaskMinPrecision)
return !testingTime.After(referenceTime)
}
18 changes: 9 additions & 9 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ func (t *timerQueueActiveTaskExecutor) executeUserTimerTimeoutTask(
}

timerSequence := t.getTimerSequence(mutableState)
referenceTime := t.shardContext.GetTimeSource().Now()
referenceTime := t.Now()
timerFired := false

Loop:
for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() {
timerInfo, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID)
Expand All @@ -183,7 +182,7 @@ Loop:
return serviceerror.NewInternal(errString)
}

if !queues.IsTimeExpired(referenceTime, timerSequenceID.Timestamp) {
if !queues.IsTimeExpired(task, referenceTime, timerSequenceID.Timestamp) {
// Timer sequence IDs are sorted; once we encounter a timer whose
// sequence ID has not expired, all subsequent timers will not have
// expired.
Expand Down Expand Up @@ -231,7 +230,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
}

timerSequence := t.getTimerSequence(mutableState)
referenceTime := t.shardContext.GetTimeSource().Now()
referenceTime := t.Now()
updateMutableState := false
scheduleWorkflowTask := false

Expand All @@ -242,7 +241,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
// created.
isHeartBeatTask := task.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(task.GetVisibilityTime(), heartbeatTimeoutVis) {
if isHeartBeatTask && ok && queues.IsTimeExpired(task, task.GetVisibilityTime(), heartbeatTimeoutVis) {
if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(
ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil {
return err
Expand All @@ -252,7 +251,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(

Loop:
for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() {
if !queues.IsTimeExpired(referenceTime, timerSequenceID.Timestamp) {
if !queues.IsTimeExpired(task, referenceTime, timerSequenceID.Timestamp) {
// timer sequence IDs are sorted, once there is one timer
// sequence ID not expired, all after that wil not expired
break Loop
Expand Down Expand Up @@ -618,7 +617,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowRunTimeoutTask(
return err
}

if !t.isValidWorkflowRunTimeoutTask(mutableState) {
if !t.isValidWorkflowRunTimeoutTask(mutableState, task) {
return nil
}

Expand All @@ -628,7 +627,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowRunTimeoutTask(
initiator := enumspb.CONTINUE_AS_NEW_INITIATOR_UNSPECIFIED

wfExpTime := mutableState.GetExecutionInfo().WorkflowExecutionExpirationTime
if wfExpTime == nil || wfExpTime.AsTime().IsZero() || wfExpTime.AsTime().After(t.shardContext.GetTimeSource().Now()) {
if wfExpTime == nil || wfExpTime.AsTime().IsZero() || wfExpTime.AsTime().After(t.Now()) {
backoffInterval, retryState = mutableState.GetRetryBackoffDuration(timeoutFailure)
if backoffInterval != backoff.NoBackoff {
// We have a retry policy and we should retry.
Expand Down Expand Up @@ -678,7 +677,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowRunTimeoutTask(
mutableState.GetNamespaceEntry(),
mutableState.GetWorkflowKey().WorkflowID,
newRunID,
t.shardContext.GetTimeSource().Now(),
t.Now(),
mutableState,
)
if err != nil {
Expand Down Expand Up @@ -802,6 +801,7 @@ func (t *timerQueueActiveTaskExecutor) executeStateMachineTimerTask(
ctx,
wfCtx,
ms,
task,
func(node *hsm.Node, task hsm.Task) error {
return t.shardContext.StateMachineRegistry().ExecuteTimerTask(t, node, task)
},
Expand Down
Loading

0 comments on commit c3f87c1

Please sign in to comment.