Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Reset] Add AllowChildReconnect flag in task generator #7157

Merged
merged 12 commits into from
Jan 25, 2025
4 changes: 3 additions & 1 deletion service/history/transfer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,9 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
// Note: childStarted flag above is computed from the parent's history. When this is TRUE it's guaranteed that the child was succesfully started.
// But if it's FALSE then the child *may or maynot* be started (ex: we failed to record ChildExecutionStarted event previously.)
// Hence we need to check the child workflow ID and attempt to reconnect before proceeding to start a new instance of the child.
if mutableState.IsResetRun() {
// This path is usually taken when the parent is being reset and the reset point (i.e baseWorkflowInfo.LowestCommonAncestorEventId) is after the child was initiated.
baseWorkflowInfo := mutableState.GetBaseWorkflowInfo()
if mutableState.IsResetRun() && baseWorkflowInfo != nil && baseWorkflowInfo.LowestCommonAncestorEventId >= childInfo.InitiatedEventId {
childRunID, err := t.verifyChildWorkflow(ctx, mutableState, targetNamespaceEntry, attributes.WorkflowId)
if err != nil {
return err
Expand Down
8 changes: 5 additions & 3 deletions service/history/transfer_queue_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2066,7 +2066,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Re
s.Nil(err)
mutableState.GetExecutionInfo().OriginalExecutionRunId = originalExecutionRunID

event, _ := addStartChildWorkflowExecutionInitiatedEvent(
childInitEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(
mutableState,
1111,
uuid.New(),
Expand All @@ -2081,6 +2081,8 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Re
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)
// Set the base workflow for the reset run and simulate a reset point that is after the childInitEvent.EventId
mutableState.SetBaseWorkflow("baseRunID", childInitEvent.EventId+1, 123)

taskID := s.mustGenerateTaskID()
transferTask := &tasks.StartChildExecutionTask{
Expand All @@ -2093,11 +2095,11 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Re
TargetNamespaceID: tests.ChildNamespaceID.String(),
TargetWorkflowID: childWorkflowID,
TaskID: taskID,
InitiatedEventID: event.GetEventId(),
InitiatedEventID: childInitEvent.GetEventId(),
VisibilityTimestamp: time.Now().UTC(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
persistenceMutableState := s.createPersistenceMutableState(mutableState, childInitEvent.GetEventId(), childInitEvent.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
// Assert that child workflow describe is called.
// The child describe returns a mock parent whose originalExecutionRunID points to the same as the current reset run's originalExecutionRunID
Expand Down
Loading