From 093ab5503123b057fedf8da8448546c0b67e06d6 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Thu, 19 Dec 2024 14:06:10 -0500 Subject: [PATCH 01/18] Added functional tests testing queries for versioning-3 --- common/testing/taskpoller/taskpoller.go | 72 +++++-- service/history/api/get_workflow_util.go | 1 + tests/update_workflow_test.go | 116 ++++++------ tests/versioning_3_test.go | 228 +++++++++++++++++++---- 4 files changed, 309 insertions(+), 108 deletions(-) diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index f9c9941646e..bdbd5a8c311 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -59,13 +59,21 @@ type ( tv *testvars.TestVars timeout time.Duration } - optionFunc func(*options) + optionFunc func(*options) + TaskCompletedRequest struct { + WorkflowTaskCompletedRequest *workflowservice.RespondWorkflowTaskCompletedRequest + QueryTaskCompletedRequest *workflowservice.RespondQueryTaskCompletedRequest + } + TaskCompletedResponse struct { + WorkflowTaskCompletedResponse *workflowservice.RespondWorkflowTaskCompletedResponse + QueryTaskCompletedResponse *workflowservice.RespondQueryTaskCompletedResponse + } ) var ( // DrainWorkflowTask returns an empty RespondWorkflowTaskCompletedRequest - DrainWorkflowTask = func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - return &workflowservice.RespondWorkflowTaskCompletedRequest{}, nil + DrainWorkflowTask = func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error) { + return &TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{}}, nil } // CompleteActivityTask returns a RespondActivityTaskCompletedRequest with an auto-generated `Result` from `tv.Any().Payloads()`. CompleteActivityTask = func(tv *testvars.TestVars) func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { @@ -111,9 +119,9 @@ func (p *TaskPoller) PollWorkflowTask( // If no task is available, it returns NoTaskAvailable. func (p *TaskPoller) PollAndHandleWorkflowTask( tv *testvars.TestVars, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error), opts ...optionFunc, -) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { +) (*TaskCompletedResponse, error) { return p. PollWorkflowTask(&workflowservice.PollWorkflowTaskQueueRequest{}). HandleTask(tv, handler, opts...) @@ -125,9 +133,9 @@ func (p *TaskPoller) PollAndHandleWorkflowTask( // If no task is available, it returns NoTaskAvailable. func (p *workflowTaskPoller) HandleTask( tv *testvars.TestVars, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error), opts ...optionFunc, -) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { +) (*TaskCompletedResponse, error) { p.t.Helper() options := newOptions(tv, opts) ctx, cancel := newContext(options) @@ -141,9 +149,9 @@ func (p *workflowTaskPoller) HandleTask( func (p *TaskPoller) HandleWorkflowTask( tv *testvars.TestVars, task *workflowservice.PollWorkflowTaskQueueResponse, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error), opts ...optionFunc, -) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { +) (*TaskCompletedResponse, error) { p.t.Helper() options := newOptions(tv, opts) ctx, cancel := newContext(options) @@ -239,9 +247,6 @@ func (p *workflowTaskPoller) pollTask( } events = history.Events - if len(events) == 0 { - return nil, errors.New("history events are empty") - } nextPageToken := resp.NextPageToken for nextPageToken != nil { @@ -265,8 +270,8 @@ func (p *workflowTaskPoller) pollTask( func (p *workflowTaskPoller) pollAndHandleTask( ctx context.Context, opts *options, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), -) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error), +) (*TaskCompletedResponse, error) { p.t.Helper() task, err := p.pollTask(ctx, opts) if err != nil { @@ -279,19 +284,52 @@ func (p *workflowTaskPoller) handleTask( ctx context.Context, opts *options, task *workflowservice.PollWorkflowTaskQueueResponse, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), -) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error), +) (*TaskCompletedResponse, error) { p.t.Helper() reply, err := handler(task) if err != nil { return nil, p.respondTaskFailed(ctx, opts, task.TaskToken, err) } - resp, err := p.respondTaskCompleted(ctx, opts, task, reply) + if reply.QueryTaskCompletedRequest != nil { + resp, err := p.respondQueryTaskCompleted(ctx, task, reply.QueryTaskCompletedRequest) + if err != nil { + return nil, err + } + return &TaskCompletedResponse{QueryTaskCompletedResponse: resp}, nil + } + + resp, err := p.respondTaskCompleted(ctx, opts, task, reply.WorkflowTaskCompletedRequest) if err != nil { return nil, err } + return &TaskCompletedResponse{WorkflowTaskCompletedResponse: resp}, nil +} + +func (p *workflowTaskPoller) respondQueryTaskCompleted( + ctx context.Context, + task *workflowservice.PollWorkflowTaskQueueResponse, + reply *workflowservice.RespondQueryTaskCompletedRequest, +) (*workflowservice.RespondQueryTaskCompletedResponse, error) { + p.t.Helper() + if task == nil { + return nil, errors.New("missing PollWorkflowTaskQueue") + } + if reply == nil { + return nil, errors.New("missing RespondQueryTaskCompleted return") + } + if reply.Namespace == "" { + reply.Namespace = p.namespace + } + if reply.TaskToken == nil { + reply.TaskToken = task.TaskToken + } + resp, err := p.client.RespondQueryTaskCompleted(ctx, reply) + if err != nil { + return nil, fmt.Errorf("failed to respond with respondQueryTaskCompleted: %w", err) + } return resp, nil } diff --git a/service/history/api/get_workflow_util.go b/service/history/api/get_workflow_util.go index df38fb5d3a4..066ccc606c2 100644 --- a/service/history/api/get_workflow_util.go +++ b/service/history/api/get_workflow_util.go @@ -361,5 +361,6 @@ func MutableStateToGetResponse( InheritedBuildId: mutableState.GetInheritedBuildId(), MostRecentWorkerVersionStamp: mostRecentWorkerVersionStamp, TransitionHistory: mutableState.GetExecutionInfo().TransitionHistory, + VersioningInfo: mutableState.GetExecutionInfo().VersioningInfo, }, nil } diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index 49888978d77..a60edf5752d 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -1381,10 +1381,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A // Drain existing first WT from regular task queue, but respond with sticky queue enabled response, next WT will go to sticky queue. _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ StickyAttributes: tv.StickyExecutionAttributes(3 * time.Second), - }, nil + }}, nil }) s.NoError(err) @@ -1393,7 +1393,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A res, err := s.TaskPoller. PollWorkflowTask(&workflowservice.PollWorkflowTaskQueueRequest{TaskQueue: tv.StickyTaskQueue()}). HandleTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { // This WT contains partial history because sticky was enabled. s.EqualHistory(` 4 WorkflowTaskCompleted @@ -1406,14 +1406,15 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A s.Equal(tv.HandlerName(), updRequest.GetInput().GetName()) s.EqualValues(5, updRequestMsg.GetEventId()) - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Commands: s.UpdateAcceptCompleteCommands(tv, "1"), Messages: s.UpdateAcceptCompleteMessages(tv, updRequestMsg, "1"), - }, nil + }}, nil }) require.NoError(s.T(), err) require.NotNil(s.T(), res) - require.EqualValues(s.T(), 0, res.ResetHistoryEventId) + require.NotNil(s.T(), res.WorkflowTaskCompletedResponse) + require.EqualValues(s.T(), 0, res.WorkflowTaskCompletedResponse.ResetHistoryEventId) }() // This is to make sure that sticky poller above reached server first. @@ -1626,7 +1627,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re // Process update in workflow. res, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -1643,14 +1644,14 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re s.Equal(tv.HandlerName(), updRequest.GetInput().GetName()) s.EqualValues(5, updRequestMsg.GetEventId()) - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateRejectMessages(tv, updRequestMsg, "1"), - }, nil + }}, nil }) s.NoError(err) updateResult := <-updateResultCh s.Equal("rejection-of-"+tv.UpdateID("1"), updateResult.GetOutcome().GetFailure().GetMessage()) - s.EqualValues(3, res.ResetHistoryEventId) + s.EqualValues(3, res.WorkflowTaskCompletedResponse.ResetHistoryEventId) // Send signal to create WT. err = s.SendSignal(s.Namespace(), tv.WorkflowExecution(), tv.Any().String(), tv.Any().Payloads(), tv.Any().String()) @@ -1658,7 +1659,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re // Process signal and complete workflow. res, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -1668,7 +1669,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re 6 WorkflowTaskScheduled 7 WorkflowTaskStarted`, task.History) - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Commands: []*commandpb.Command{ { CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, @@ -1677,7 +1678,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re }, }, }, - }, nil + }}, nil }) s.NoError(err) s.NotNil(res) @@ -2209,7 +2210,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() // Try to accept update in workflow: get malformed response. _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -2220,7 +2221,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() `, task.History) updRequestMsg := task.Messages[0] - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Commands: s.UpdateAcceptCommands(tv, "1"), // Emulate bug in worker/SDK update handler code. Return malformed acceptance response. Messages: []*protocolpb.Message{ @@ -2235,7 +2236,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() }), }, }, - }, nil + }}, nil }) s.Error(err) s.Contains(err.Error(), "wasn't found") @@ -2245,7 +2246,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() // Try to accept update in workflow 2nd time: get error. Poller will fail WT. _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { // 2nd attempt has same updates attached to it. updRequestMsg := task.Messages[0] s.EqualValues(8, updRequestMsg.GetEventId()) @@ -2265,7 +2266,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() // Try to accept update in workflow 3rd time: get error. Poller will fail WT. _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { // 3rd attempt UpdateWorkflowExecution call has timed out but the // update is still running updRequestMsg := task.Messages[0] @@ -2279,7 +2280,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() // Complete workflow. _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -2291,14 +2292,14 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() 8 WorkflowTaskScheduled 9 WorkflowTaskStarted`, task.History) - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Commands: []*commandpb.Command{ { CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}}, }, }, - }, nil + }}, nil }) s.NoError(err) @@ -2693,10 +2694,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Schedul // Drain first WT and respond with sticky enabled response to enable sticky task queue. stickyScheduleToStartTimeout := 1 * time.Second _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ StickyAttributes: tv.StickyExecutionAttributes(stickyScheduleToStartTimeout), - }, nil + }}, nil }) s.NoError(err) @@ -2708,7 +2709,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Schedul // Try to process update in workflow, poll from normal task queue. res, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { // Speculative WFT timed out on sticky task queue. Server sent full history with sticky timeout event. s.EqualHistory(` 1 WorkflowExecutionStarted @@ -2721,9 +2722,9 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Schedul 8 WorkflowTaskStarted`, task.History) // Reject update, but WFT will still be in the history due to timeout on sticky queue. - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateRejectMessages(tv, task.Messages[0], "1"), - }, nil + }}, nil }) s.NoError(err) s.NotNil(res) @@ -3237,7 +3238,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbe // Heartbeat from speculative WT (no messages, no commands). var updRequestMsg *protocolpb.Message res, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -3251,17 +3252,17 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbe updRequestMsg = task.Messages[0] s.EqualValues(5, updRequestMsg.GetEventId()) - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ ReturnNewWorkflowTask: true, ForceCreateNewWorkflowTask: true, - }, nil + }}, nil }) s.NoError(err) // Reject update from workflow. updateResp, err := s.TaskPoller.HandleWorkflowTask(tv, - res.GetWorkflowTask(), - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + res.WorkflowTaskCompletedResponse.GetWorkflowTask(), + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.EqualHistory(` 7 WorkflowTaskCompleted 8 WorkflowTaskScheduled // New WT (after heartbeat) is normal and won't disappear from the history after reject. @@ -3270,16 +3271,17 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbe s.Empty(task.Messages) - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateRejectMessages(tv, updRequestMsg, "1"), - }, nil + }}, nil }) s.NoError(err) s.NotNil(updateResp) + s.NotNil(updateResp.WorkflowTaskCompletedResponse) updateResult := <-updateResultCh s.Equal("rejection-of-"+tv.UpdateID("1"), updateResult.GetOutcome().GetFailure().GetMessage()) - s.EqualValues(0, updateResp.ResetHistoryEventId, "no reset of event ID should happened after update rejection because of heartbeat") + s.EqualValues(0, updateResp.WorkflowTaskCompletedResponse.ResetHistoryEventId, "no reset of event ID should happened after update rejection because of heartbeat") events := s.GetHistory(s.Namespace(), tv.WorkflowExecution()) @@ -4944,10 +4946,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { uwsCh := sendUpdateWithStart(testcore.NewContext(), startReq, updateReq) _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0], "1"), - }, nil + }}, nil }) s.NoError(err) @@ -4982,10 +4984,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { uwsCh := sendUpdateWithStart(testcore.NewContext(), startReq, updateReq) _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateRejectMessages(tv, task.Messages[0], "1"), - }, nil + }}, nil }) s.NoError(err) @@ -5034,10 +5036,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { uwsCh := sendUpdateWithStart(testcore.NewContext(), startReq, updateReq) _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0], "1"), - }, nil + }}, nil }) s.NoError(err) @@ -5084,10 +5086,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { uwsCh := sendUpdateWithStart(testcore.NewContext(), startReq, updateReq) _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateRejectMessages(tv, task.Messages[0], "1"), - }, nil + }}, nil }) s.NoError(err) @@ -5132,10 +5134,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { s.NoError(err) _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0], "1"), - }, nil + }}, nil }) s.NoError(err) @@ -5208,10 +5210,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { // 1st uwsCh1 := sendUpdateWithStart(testcore.NewContext(), startReq, updReq) _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0], "1"), - }, nil + }}, nil }) s.NoError(err) uwsRes1 := <-uwsCh1 @@ -5262,10 +5264,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { uwsCh := sendUpdateWithStart(testcore.NewContext(), startReq, updateReq) _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0], "1"), - }, nil + }}, nil }) s.NoError(err) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 7d8dc2a9829..c0ae2ddf7b4 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -39,6 +39,7 @@ import ( commonpb "go.temporal.io/api/common/v1" deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" + querypb "go.temporal.io/api/query/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" @@ -180,9 +181,10 @@ func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) { wftCompleted := make(chan interface{}) s.pollWftAndHandle(tv, false, wftCompleted, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) - return respondWftWithActivities(tv, tv, sticky, vbUnpinned, "5"), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondWftWithActivities(tv, tv, + sticky, vbUnpinned, "5")}, nil }) actCompleted := make(chan interface{}) @@ -205,13 +207,130 @@ func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) { s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) s.pollWftAndHandle(tv, sticky, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) - return respondCompleteWorkflow(tv, vbUnpinned), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tv, vbUnpinned)}, nil }) s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) } +func (s *Versioning3Suite) TestQueryWithPinnedOverride_NoSticky() { + s.RunTestWithMatchingBehavior( + func() { + s.testQueryWithPinnedOverride(false) + }, + ) +} + +func (s *Versioning3Suite) TestQueryWithPinnedOverride_Sticky() { + s.RunTestWithMatchingBehavior( + func() { + s.testQueryWithPinnedOverride(true) + }, + ) +} + +func (s *Versioning3Suite) testQueryWithPinnedOverride(sticky bool) { + tv := testvars.New(s) + + if sticky { + s.warmUpSticky(tv) + } + + wftCompleted := make(chan interface{}) + s.pollWftAndHandle(tv, false, wftCompleted, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + s.NotNil(task) + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondEmptyWft(tv, sticky, vbUnpinned)}, nil + }) + + override := makePinnedOverride(tv.Deployment()) + we := s.startWorkflow(tv, override) + + <-wftCompleted + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) + } + + s.pollAndQueryWorkflow(tv, sticky) + + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) + } +} + +func (s *Versioning3Suite) TestUnpinnedQuery_NoSticky() { + s.RunTestWithMatchingBehavior( + func() { + s.testUnpinnedQuery(false) + }, + ) +} + +func (s *Versioning3Suite) TestUnpinnedQuery_Sticky() { + s.RunTestWithMatchingBehavior( + func() { + s.testUnpinnedQuery(true) + }, + ) +} + +func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { + tv := testvars.New(s) + d := tv.Deployment() + + if sticky { + s.warmUpSticky(tv) + } + + wftCompleted := make(chan interface{}) + s.pollWftAndHandle(tv, false, wftCompleted, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, transitionTo(d)) + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondEmptyWft(tv, sticky, vbUnpinned)}, nil + }) + + s.setCurrentDeployment(d) + s.waitForDeploymentDataPropagation(tv, tqTypeWf) + + we := s.startWorkflow(tv, nil) + + <-wftCompleted + s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) + } + + s.pollAndQueryWorkflow(tv, sticky) + + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) + } +} + +func (s *Versioning3Suite) pollAndQueryWorkflow( + tv *testvars.TestVars, + sticky bool, +) { + queryResultCh := make(chan interface{}) + s.pollWftAndHandle(tv, sticky, queryResultCh, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + s.NotNil(task) + s.NotNil(task.Query) + return &taskpoller.TaskCompletedRequest{QueryTaskCompletedRequest: respondQueryTaskCompleted(task, s.Namespace())}, nil + }) + + response, err := s.queryWorkflow(tv) + s.Error(err) + s.Nil(response) + + <-queryResultCh +} + func (s *Versioning3Suite) TestUnpinnedWorkflow_Sticky() { s.RunTestWithMatchingBehavior( func() { @@ -238,10 +357,11 @@ func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) { wftCompleted := make(chan interface{}) s.pollWftAndHandle(tv, false, wftCompleted, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, transitionTo(d)) - return respondWftWithActivities(tv, tv, sticky, vbUnpinned, "5"), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondWftWithActivities(tv, tv, sticky, + vbUnpinned, "5")}, nil }) actCompleted := make(chan interface{}) @@ -266,9 +386,10 @@ func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) { s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) s.pollWftAndHandle(tv, sticky, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) - return respondCompleteWorkflow(tv, vbUnpinned), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tv, + vbUnpinned)}, nil }) s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) } @@ -282,7 +403,7 @@ func (s *Versioning3Suite) TestTransitionFromWft_NoSticky() { } func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { - // Wf runs one TWF and one AT on dA, then the second WFT is redirected to dB and + // Wf runs one WFT and one AT on dA, then the second WFT is redirected to dB and // transitions the wf with it. tvA := testvars.New(s).WithBuildId("A") @@ -298,10 +419,11 @@ func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { we := s.startWorkflow(tvA, nil) s.pollWftAndHandle(tvA, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tvA, vbUnspecified, nil, nil, transitionTo(dA)) - return respondWftWithActivities(tvA, tvA, sticky, vbUnpinned, "5"), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondWftWithActivities(tvA, tvA, + sticky, vbUnpinned, "5")}, nil }) s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) if sticky { @@ -319,10 +441,11 @@ func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) s.pollWftAndHandle(tvB, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, transitionTo(dB)) - return respondCompleteWorkflow(tvB, vbUnpinned), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tvB, + vbUnpinned)}, nil }) s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) } @@ -339,18 +462,20 @@ func (s *Versioning3Suite) TestEagerActivity() { s.startWorkflow(tv, nil) poller, resp := s.pollWftAndHandle(tv, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, transitionTo(d)) resp := respondWftWithActivities(tv, tv, true, vbUnpinned, "5") resp.Commands[0].GetScheduleActivityTaskCommandAttributes().RequestEagerExecution = true - return resp, nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: resp}, nil }) s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) - s.NotEmpty(resp.GetActivityTasks()) + s.NotEmpty(resp) + s.NotEmpty(resp.WorkflowTaskCompletedResponse) + s.NotEmpty(resp.WorkflowTaskCompletedResponse.GetActivityTasks()) - _, err := poller.HandleActivityTask(tv, resp.GetActivityTasks()[0], + _, err := poller.HandleActivityTask(tv, resp.WorkflowTaskCompletedResponse.GetActivityTasks()[0], func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { s.NotNil(task) return respondActivity(), nil @@ -359,9 +484,9 @@ func (s *Versioning3Suite) TestEagerActivity() { s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) s.pollWftAndHandle(tv, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) - return respondCompleteWorkflow(tv, vbUnpinned), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tv, vbUnpinned)}, nil }) s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) } @@ -401,10 +526,11 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { we := s.startWorkflow(tvA, nil) s.pollWftAndHandle(tvA, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tvA, vbUnspecified, nil, nil, transitionTo(dA)) - return respondWftWithActivities(tvA, tvA, sticky, vbUnpinned, "5", "6", "7", "8"), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondWftWithActivities(tvA, tvA, + sticky, vbUnpinned, "5", "6", "7", "8")}, nil }) s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) if sticky { @@ -478,7 +604,7 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { // 5. The transition should create a new WFT to be sent to dB. s.pollWftAndHandle(tvB, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, transitionTo(dB)) close(transitionStarted) @@ -488,7 +614,8 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { <-act2Failed transitionCompleted.Store(true) s.Logger.Info("Transition wft completed") - return respondEmptyWft(tvB, sticky, vbUnpinned), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondEmptyWft(tvB, + sticky, vbUnpinned)}, nil }) s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) if sticky { @@ -498,10 +625,11 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { // 9. Now all activities should complete. <-act2To4Completed s.pollWftAndHandle(tvB, sticky, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) s.Logger.Info("Final wft completed") - return respondCompleteWorkflow(tvB, vbUnpinned), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tvB, + vbUnpinned)}, nil }) s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) } @@ -534,11 +662,12 @@ func (s *Versioning3Suite) testIndependentActivity(behavior enumspb.VersioningBe s.startWorkflow(tvWf, nil) s.pollWftAndHandle(tvWf, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tvWf, vbUnspecified, nil, nil, transitionTo(dWf)) s.Logger.Info("First wf task completed") - return respondWftWithActivities(tvWf, tvAct, false, behavior, "5"), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondWftWithActivities(tvWf, tvAct, + false, behavior, "5")}, nil }) s.verifyWorkflowVersioning(tvWf, behavior, dWf, nil, nil) @@ -551,9 +680,10 @@ func (s *Versioning3Suite) testIndependentActivity(behavior enumspb.VersioningBe s.verifyWorkflowVersioning(tvWf, behavior, dWf, nil, nil) s.pollWftAndHandle(tvWf, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.NotNil(task) - return respondCompleteWorkflow(tvWf, behavior), nil + return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tvWf, + behavior)}, nil }) s.verifyWorkflowVersioning(tvWf, behavior, dWf, nil, nil) } @@ -722,6 +852,17 @@ func respondCompleteWorkflow( } } +func respondQueryTaskCompleted( + task *workflowservice.PollWorkflowTaskQueueResponse, + namespace string, +) *workflowservice.RespondQueryTaskCompletedRequest { + request := &workflowservice.RespondQueryTaskCompletedRequest{ + Namespace: namespace, + TaskToken: task.TaskToken, + } + return request +} + func (s *Versioning3Suite) startWorkflow( tv *testvars.TestVars, override *workflowpb.VersioningOverride, @@ -751,6 +892,25 @@ func (s *Versioning3Suite) startWorkflow( } } +func (s *Versioning3Suite) queryWorkflow( + tv *testvars.TestVars, +) (*workflowservice.QueryWorkflowResponse, error) { + request := &workflowservice.QueryWorkflowRequest{ + Namespace: s.Namespace(), + Execution: tv.WorkflowExecution(), + Query: &querypb.WorkflowQuery{ + QueryType: tv.Any().String(), + }, + } + + // using a short context since we don't have query handlers defined and + // are okay with timeouts + shortCtx, cancel := context.WithTimeout(testcore.NewContext(), 100*time.Millisecond) + defer cancel() + response, err := s.FrontendClient().QueryWorkflow(shortCtx, request) + return response, err +} + // Name is used by testvars. We use a shorten test name in variables so that physical task queue IDs // do not grow larger that DB column limit (currently as low as 272 chars). func (s *Versioning3Suite) Name() string { @@ -771,15 +931,15 @@ func (s *Versioning3Suite) pollWftAndHandle( tv *testvars.TestVars, sticky bool, async chan<- interface{}, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), -) (*taskpoller.TaskPoller, *workflowservice.RespondWorkflowTaskCompletedResponse) { + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error), +) (*taskpoller.TaskPoller, *taskpoller.TaskCompletedResponse) { poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) d := tv.Deployment() tq := tv.TaskQueue() if sticky { tq = tv.StickyTaskQueue() } - f := func() *workflowservice.RespondWorkflowTaskCompletedResponse { + f := func() *taskpoller.TaskCompletedResponse { resp, err := poller.PollWorkflowTask( &workflowservice.PollWorkflowTaskQueueRequest{ WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ @@ -851,7 +1011,7 @@ func (s *Versioning3Suite) idlePollWorkflow( }, ).HandleTask( tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.Fail(unexpectedTaskMessage) return nil, nil }, @@ -914,7 +1074,7 @@ func (s *Versioning3Suite) warmUpSticky( }, ).HandleTask( tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { s.Fail("sticky task is not expected") return nil, nil }, From 876b7f643a2d3fa54d54bf7b3fa1955939d0001a Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Fri, 20 Dec 2024 10:01:42 -0500 Subject: [PATCH 02/18] tests to verify routing of nexus tasks using versioning-3; --- common/testing/taskpoller/taskpoller.go | 126 +++++++++++++++++++ tests/versioning_3_test.go | 160 ++++++++++++++++++++++++ 2 files changed, 286 insertions(+) diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index bdbd5a8c311..2bcaefb81a7 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -28,6 +28,8 @@ import ( "context" "errors" "fmt" + "github.com/nexus-rpc/sdk-go/nexus" + nexuspb "go.temporal.io/api/nexus/v1" "testing" "time" @@ -55,6 +57,10 @@ type ( *TaskPoller pollActivityTaskRequest *workflowservice.PollActivityTaskQueueRequest } + nexusTaskPoller struct { + *TaskPoller + pollNexusTaskRequest *workflowservice.PollNexusTaskQueueRequest + } options struct { tv *testvars.TestVars timeout time.Duration @@ -112,6 +118,12 @@ func (p *TaskPoller) PollWorkflowTask( return &workflowTaskPoller{TaskPoller: p, pollWorkflowTaskRequest: req} } +func (p *TaskPoller) PollNexusTask( + req *workflowservice.PollNexusTaskQueueRequest, +) *nexusTaskPoller { + return &nexusTaskPoller{TaskPoller: p, pollNexusTaskRequest: req} +} + // PollAndHandleWorkflowTask issues a PollWorkflowTaskQueueRequest to obtain a new workflow task, // invokes the handler with the task, and completes/fails the task accordingly. // Any unspecified but required request and response fields are automatically generated using `tv`. @@ -143,6 +155,120 @@ func (p *workflowTaskPoller) HandleTask( return p.pollAndHandleTask(ctx, options, handler) } +func (p *nexusTaskPoller) pollTask( + ctx context.Context, + opts *options, +) (*workflowservice.PollNexusTaskQueueResponse, error) { + p.t.Helper() + + req := common.CloneProto(p.pollNexusTaskRequest) + if req.Namespace == "" { + req.Namespace = p.namespace + } + if req.TaskQueue == nil { + req.TaskQueue = opts.tv.TaskQueue() + } + if req.Identity == "" { + req.Identity = opts.tv.WorkerIdentity() + } + resp, err := p.client.PollNexusTaskQueue(ctx, req) + if err != nil { + return nil, err + } + if resp == nil || resp.TaskToken == nil { + return nil, NoWorkflowTaskAvailable + } + + return resp, err +} + +func (p *nexusTaskPoller) pollAndHandleTask( + ctx context.Context, + opts *options, + handler func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error), +) (*workflowservice.RespondNexusTaskCompletedResponse, error) { + p.t.Helper() + task, err := p.pollTask(ctx, opts) + if err != nil { + return nil, fmt.Errorf("failed to poll nexus task: %w", err) + } + return p.handleTask(ctx, opts, task, handler) +} +func (p *nexusTaskPoller) handleTask( + ctx context.Context, + opts *options, + task *workflowservice.PollNexusTaskQueueResponse, + handler func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error), +) (*workflowservice.RespondNexusTaskCompletedResponse, error) { + p.t.Helper() + reply, err := handler(task) + if err != nil { + return nil, p.respondNexusTaskFailed(ctx, opts, task.TaskToken) + } + + resp, err := p.respondNexusTaskCompleted(ctx, opts, task, reply) + if err != nil { + return nil, err + } + + return resp, nil +} + +func (p *nexusTaskPoller) respondNexusTaskCompleted( + ctx context.Context, + opts *options, + task *workflowservice.PollNexusTaskQueueResponse, + reply *workflowservice.RespondNexusTaskCompletedRequest, +) (*workflowservice.RespondNexusTaskCompletedResponse, error) { + p.t.Helper() + if reply == nil { + return nil, errors.New("missing RespondWorkflowTaskCompletedRequest return") + } + if reply.Namespace == "" { + reply.Namespace = p.namespace + } + if len(reply.TaskToken) == 0 { + reply.TaskToken = task.TaskToken + } + if reply.Identity == "" { + reply.Identity = opts.tv.WorkerIdentity() + } + reply.Response = &nexuspb.Response{} + + return p.client.RespondNexusTaskCompleted(ctx, reply) +} + +func (p *nexusTaskPoller) respondNexusTaskFailed( + ctx context.Context, + opts *options, + taskToken []byte, +) error { + p.t.Helper() + _, err := p.client.RespondNexusTaskFailed( + ctx, + &workflowservice.RespondNexusTaskFailedRequest{ + Namespace: p.namespace, + TaskToken: taskToken, + Identity: opts.tv.WorkerIdentity(), + Error: &nexuspb.HandlerError{ + ErrorType: string(nexus.HandlerErrorTypeInternal), + }, + }) + return err +} + +func (p *nexusTaskPoller) HandleTask( + tv *testvars.TestVars, + handler func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error), + opts ...optionFunc, +) (*workflowservice.RespondNexusTaskCompletedResponse, error) { + p.t.Helper() + options := newOptions(tv, opts) + ctx, cancel := newContext(options) + defer cancel() + return p.pollAndHandleTask(ctx, options, handler) +} + // HandleWorkflowTask invokes the provided handler with the provided task, and completes/fails the task accordingly. // Any unspecified but required request and response fields are automatically generated using `tv`. // Returning an error from `handler` fails the task. diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index c0ae2ddf7b4..7bf8909b87a 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -28,6 +28,8 @@ import ( "context" "errors" "fmt" + nexuspb "go.temporal.io/api/nexus/v1" + "go.temporal.io/sdk/converter" "sync/atomic" "testing" "time" @@ -60,6 +62,7 @@ import ( const ( tqTypeWf = enumspb.TASK_QUEUE_TYPE_WORKFLOW tqTypeAct = enumspb.TASK_QUEUE_TYPE_ACTIVITY + tqTypeNexus = enumspb.TASK_QUEUE_TYPE_NEXUS vbUnspecified = enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED vbPinned = enumspb.VERSIONING_BEHAVIOR_PINNED vbUnpinned = enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE @@ -450,6 +453,64 @@ func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) } +func (s *Versioning3Suite) TestNexusTask_StaysOnCurrentDeployment() { + s.RunTestWithMatchingBehavior( + func() { + s.nexusTaskStaysOnCurrentDeployment() + }, + ) +} + +func (s *Versioning3Suite) nexusTaskStaysOnCurrentDeployment() { + tvA := testvars.New(s).WithBuildId("A") + tvB := tvA.WithBuildId("B") + + nexusRequest := &matchingservice.DispatchNexusTaskRequest{ + NamespaceId: s.GetNamespaceID(s.Namespace()), + TaskQueue: tvA.TaskQueue(), + Request: &nexuspb.Request{ + Header: map[string]string{ + // placeholder value as passing in an empty map would result in protoc deserializing + // it as nil, which breaks existing logic inside of matching + "request-timeout": "1", + }, + }, + } + + // current deployment is -> A + s.updateTaskQueueDeploymentData(tvA, 0, tqTypeNexus) + s.waitForDeploymentDataPropagation(tvA, tqTypeNexus) + + // local poller with deployment A receives task + s.pollAndDispatchNexusTask(tvA, nexusRequest) + + // current deployment is now -> B + s.updateTaskQueueDeploymentData(tvB, 0, tqTypeNexus) + s.waitForDeploymentDataPropagation(tvB, tqTypeNexus) + + // Pollers of A are there but should not get any task + go s.idlePollNexus(tvA, true, ver3MinPollTime, "nexus task should not go to the old deployment") + + s.pollAndDispatchNexusTask(tvB, nexusRequest) +} + +func (s *Versioning3Suite) pollAndDispatchNexusTask( + tv *testvars.TestVars, + nexusRequest *matchingservice.DispatchNexusTaskRequest, +) { + matchingClient := s.GetTestCluster().MatchingClient() + + nexusCompleted := make(chan interface{}) + s.pollNexusTaskAndHandle(tv, false, nexusCompleted, + func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error) { + s.NotNil(task) + return &workflowservice.RespondNexusTaskCompletedRequest{}, nil // response object gets filled during processing + }) + + _, err := matchingClient.DispatchNexusTask(context.Background(), nexusRequest) + s.NoError(err) +} + func (s *Versioning3Suite) TestEagerActivity() { // The first WFT asks for an activity to starts and get it eagerly in the WFT completion // response. The activity is processed without issues and wf completes. @@ -852,6 +913,41 @@ func respondCompleteWorkflow( } } +func respondScheduleNexusOperation( + tv *testvars.TestVars, + behavior enumspb.VersioningBehavior, + endpointName string, +) *workflowservice.RespondWorkflowTaskCompletedRequest { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION, + Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ + Endpoint: endpointName, + Service: "service", + Operation: "operation", + Input: mustToPayload("input"), + }, + }, + }, + }, + ForceCreateNewWorkflowTask: false, + Deployment: tv.Deployment(), + VersioningBehavior: behavior, + } +} + +func mustToPayload(v any) *commonpb.Payload { + conv := converter.GetDefaultDataConverter() + payload, err := conv.ToPayload(v) + if err != nil { + + return &commonpb.Payload{} + } + return payload +} + func respondQueryTaskCompleted( task *workflowservice.PollWorkflowTaskQueueResponse, namespace string, @@ -964,6 +1060,43 @@ func (s *Versioning3Suite) pollWftAndHandle( return nil, nil } +func (s *Versioning3Suite) pollNexusTaskAndHandle( + tv *testvars.TestVars, + sticky bool, + async chan<- interface{}, + handler func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error), +) (*taskpoller.TaskPoller, *workflowservice.RespondNexusTaskCompletedResponse) { + poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) + d := tv.Deployment() + tq := tv.TaskQueue() + if sticky { + tq = tv.StickyTaskQueue() + } + f := func() *workflowservice.RespondNexusTaskCompletedResponse { + resp, err := poller.PollNexusTask( + &workflowservice.PollNexusTaskQueueRequest{ + WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: d.BuildId, + DeploymentSeriesName: d.SeriesName, + UseVersioning: true, + }, + TaskQueue: tq, + }, + ).HandleTask(tv, handler) + s.NoError(err) + return resp + } + if async == nil { + return poller, f() + } else { + go func() { + f() + close(async) + }() + } + return nil, nil +} + func (s *Versioning3Suite) pollActivityAndHandle( tv *testvars.TestVars, async chan<- interface{}, @@ -1048,6 +1181,33 @@ func (s *Versioning3Suite) idlePollActivity( ) } +func (s *Versioning3Suite) idlePollNexus( + tv *testvars.TestVars, + versioned bool, + timeout time.Duration, + unexpectedTaskMessage string, +) { + poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) + d := tv.Deployment() + _, _ = poller.PollNexusTask( + &workflowservice.PollNexusTaskQueueRequest{ + WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: d.BuildId, + UseVersioning: versioned, + DeploymentSeriesName: d.SeriesName, + }, + }).HandleTask( + tv, + func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error) { + if task != nil { + s.Fail(unexpectedTaskMessage) + } + return nil, nil + }, + taskpoller.WithTimeout(timeout), + ) +} + func (s *Versioning3Suite) verifyWorkflowStickyQueue( we *commonpb.WorkflowExecution, stickyQ *taskqueuepb.TaskQueue, From deec7143bb53e76b9e32d84112932f32a207debb Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Fri, 20 Dec 2024 10:03:21 -0500 Subject: [PATCH 03/18] goimports --- common/testing/taskpoller/taskpoller.go | 5 +++-- tests/versioning_3_test.go | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index 2bcaefb81a7..7525aa5a340 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -28,11 +28,12 @@ import ( "context" "errors" "fmt" - "github.com/nexus-rpc/sdk-go/nexus" - nexuspb "go.temporal.io/api/nexus/v1" "testing" "time" + "github.com/nexus-rpc/sdk-go/nexus" + nexuspb "go.temporal.io/api/nexus/v1" + enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/workflowservice/v1" diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 7bf8909b87a..f4c7df7f249 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -28,8 +28,6 @@ import ( "context" "errors" "fmt" - nexuspb "go.temporal.io/api/nexus/v1" - "go.temporal.io/sdk/converter" "sync/atomic" "testing" "time" @@ -41,10 +39,12 @@ import ( commonpb "go.temporal.io/api/common/v1" deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" + nexuspb "go.temporal.io/api/nexus/v1" querypb "go.temporal.io/api/query/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/converter" deploymentspb "go.temporal.io/server/api/deployment/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" From 1024efa287140211b90d75f2d17ac9b967cb8d69 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Fri, 20 Dec 2024 10:35:33 -0500 Subject: [PATCH 04/18] added waiting until poller completion --- tests/versioning_3_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index f4c7df7f249..18997e0335b 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -509,6 +509,7 @@ func (s *Versioning3Suite) pollAndDispatchNexusTask( _, err := matchingClient.DispatchNexusTask(context.Background(), nexusRequest) s.NoError(err) + <-nexusCompleted } func (s *Versioning3Suite) TestEagerActivity() { From 02c9f46374bc925c38e78014bf3444c5919b4368 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Fri, 20 Dec 2024 10:39:51 -0500 Subject: [PATCH 05/18] lint fixes --- tests/versioning_3_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 18997e0335b..eeb12f2fd1f 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -1089,12 +1089,11 @@ func (s *Versioning3Suite) pollNexusTaskAndHandle( } if async == nil { return poller, f() - } else { - go func() { - f() - close(async) - }() } + go func() { + f() + close(async) + }() return nil, nil } From ff38738c8e46e983f42e811e6373e1040c4bf84f Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Mon, 23 Dec 2024 15:51:50 -0500 Subject: [PATCH 06/18] query redirects to latest deployment --- .../history/workflow/mutable_state_impl.go | 1 + tests/versioning_3_test.go | 21 ++++++++----------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 4516f96670c..64582915ead 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -2563,6 +2563,7 @@ func (ms *MutableStateImpl) GetTransientWorkflowTaskInfo( workflowTask *WorkflowTaskInfo, identity string, ) *historyspb.TransientWorkflowTaskInfo { + // (todo-shivam): can we remove this check ms.IsTransientWorkflowTask()? if !ms.IsTransientWorkflowTask() && workflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { return nil } diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index eeb12f2fd1f..0191d49fa40 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -257,11 +257,6 @@ func (s *Versioning3Suite) testQueryWithPinnedOverride(sticky bool) { } s.pollAndQueryWorkflow(tv, sticky) - - s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) - if sticky { - s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) - } } func (s *Versioning3Suite) TestUnpinnedQuery_NoSticky() { @@ -282,6 +277,7 @@ func (s *Versioning3Suite) TestUnpinnedQuery_Sticky() { func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { tv := testvars.New(s) + tvB := tv.WithBuildId("B") d := tv.Deployment() if sticky { @@ -307,12 +303,15 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) } + go s.idlePollWorkflow(tvB, sticky, ver3MinPollTime, "new deployment should not receive query") s.pollAndQueryWorkflow(tv, sticky) - s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) - if sticky { - s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) - } + // redirect query to new deployment + s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) + + go s.idlePollWorkflow(tv, sticky, ver3MinPollTime, "old deployment should not receive query") + s.pollAndQueryWorkflow(tvB, sticky) + } func (s *Versioning3Suite) pollAndQueryWorkflow( @@ -1000,9 +999,7 @@ func (s *Versioning3Suite) queryWorkflow( }, } - // using a short context since we don't have query handlers defined and - // are okay with timeouts - shortCtx, cancel := context.WithTimeout(testcore.NewContext(), 100*time.Millisecond) + shortCtx, cancel := context.WithTimeout(testcore.NewContext(), common.MinLongPollTimeout) defer cancel() response, err := s.FrontendClient().QueryWorkflow(shortCtx, request) return response, err From 37b1672cac915044c9b032cb3597bf63f3e098a4 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Mon, 6 Jan 2025 16:10:39 -0500 Subject: [PATCH 07/18] Addressed comments by making separate helpers --- common/testing/taskpoller/taskpoller.go | 92 +++++++---- tests/update_workflow_test.go | 127 +++++++-------- tests/versioning_3_test.go | 197 +++++++++++++++--------- 3 files changed, 253 insertions(+), 163 deletions(-) diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index 7525aa5a340..0740b7d36b2 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -66,21 +66,13 @@ type ( tv *testvars.TestVars timeout time.Duration } - optionFunc func(*options) - TaskCompletedRequest struct { - WorkflowTaskCompletedRequest *workflowservice.RespondWorkflowTaskCompletedRequest - QueryTaskCompletedRequest *workflowservice.RespondQueryTaskCompletedRequest - } - TaskCompletedResponse struct { - WorkflowTaskCompletedResponse *workflowservice.RespondWorkflowTaskCompletedResponse - QueryTaskCompletedResponse *workflowservice.RespondQueryTaskCompletedResponse - } + optionFunc func(*options) ) var ( // DrainWorkflowTask returns an empty RespondWorkflowTaskCompletedRequest - DrainWorkflowTask = func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error) { - return &TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{}}, nil + DrainWorkflowTask = func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{}, nil } // CompleteActivityTask returns a RespondActivityTaskCompletedRequest with an auto-generated `Result` from `tv.Any().Payloads()`. CompleteActivityTask = func(tv *testvars.TestVars) func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { @@ -132,9 +124,9 @@ func (p *TaskPoller) PollNexusTask( // If no task is available, it returns NoTaskAvailable. func (p *TaskPoller) PollAndHandleWorkflowTask( tv *testvars.TestVars, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error), + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), opts ...optionFunc, -) (*TaskCompletedResponse, error) { +) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { return p. PollWorkflowTask(&workflowservice.PollWorkflowTaskQueueRequest{}). HandleTask(tv, handler, opts...) @@ -146,9 +138,9 @@ func (p *TaskPoller) PollAndHandleWorkflowTask( // If no task is available, it returns NoTaskAvailable. func (p *workflowTaskPoller) HandleTask( tv *testvars.TestVars, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error), + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), opts ...optionFunc, -) (*TaskCompletedResponse, error) { +) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { p.t.Helper() options := newOptions(tv, opts) ctx, cancel := newContext(options) @@ -276,9 +268,9 @@ func (p *nexusTaskPoller) HandleTask( func (p *TaskPoller) HandleWorkflowTask( tv *testvars.TestVars, task *workflowservice.PollWorkflowTaskQueueResponse, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error), + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), opts ...optionFunc, -) (*TaskCompletedResponse, error) { +) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { p.t.Helper() options := newOptions(tv, opts) ctx, cancel := newContext(options) @@ -394,16 +386,62 @@ func (p *workflowTaskPoller) pollTask( return resp, err } +func (p *workflowTaskPoller) HandleQueries( + tv *testvars.TestVars, + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error), + opts ...optionFunc, +) (*workflowservice.RespondQueryTaskCompletedResponse, error) { + p.t.Helper() + options := newOptions(tv, opts) + ctx, cancel := newContext(options) + defer cancel() + return p.pollAndHandleQueries(ctx, options, handler) +} + +func (p *workflowTaskPoller) pollAndHandleQueries( + ctx context.Context, + opts *options, + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error), +) (*workflowservice.RespondQueryTaskCompletedResponse, error) { + p.t.Helper() + task, err := p.pollTask(ctx, opts) + if err != nil { + return nil, fmt.Errorf("failed to poll workflow task: %w", err) + } + return p.handleQueries(ctx, opts, task, handler) +} + +func (p *workflowTaskPoller) handleQueries( + ctx context.Context, + opts *options, + task *workflowservice.PollWorkflowTaskQueueResponse, + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error), +) (*workflowservice.RespondQueryTaskCompletedResponse, error) { + p.t.Helper() + reply, err := handler(task) + if err != nil { + return nil, p.respondTaskFailed(ctx, opts, task.TaskToken, err) + } + + resp, err := p.respondQueryTaskCompleted(ctx, task, reply) + if err != nil { + return nil, err + } + + return resp, nil +} + func (p *workflowTaskPoller) pollAndHandleTask( ctx context.Context, opts *options, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error), -) (*TaskCompletedResponse, error) { + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), +) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { p.t.Helper() task, err := p.pollTask(ctx, opts) if err != nil { return nil, fmt.Errorf("failed to poll workflow task: %w", err) } + // could maybe have type assertions? return p.handleTask(ctx, opts, task, handler) } @@ -411,28 +449,20 @@ func (p *workflowTaskPoller) handleTask( ctx context.Context, opts *options, task *workflowservice.PollWorkflowTaskQueueResponse, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error), -) (*TaskCompletedResponse, error) { + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), +) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { p.t.Helper() reply, err := handler(task) if err != nil { return nil, p.respondTaskFailed(ctx, opts, task.TaskToken, err) } - if reply.QueryTaskCompletedRequest != nil { - resp, err := p.respondQueryTaskCompleted(ctx, task, reply.QueryTaskCompletedRequest) - if err != nil { - return nil, err - } - return &TaskCompletedResponse{QueryTaskCompletedResponse: resp}, nil - } - - resp, err := p.respondTaskCompleted(ctx, opts, task, reply.WorkflowTaskCompletedRequest) + resp, err := p.respondTaskCompleted(ctx, opts, task, reply) if err != nil { return nil, err } - return &TaskCompletedResponse{WorkflowTaskCompletedResponse: resp}, nil + return resp, nil } func (p *workflowTaskPoller) respondQueryTaskCompleted( diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index a60edf5752d..53dfbd63dee 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -1381,10 +1381,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A // Drain existing first WT from regular task queue, but respond with sticky queue enabled response, next WT will go to sticky queue. _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ StickyAttributes: tv.StickyExecutionAttributes(3 * time.Second), - }}, nil + }, nil }) s.NoError(err) @@ -1393,7 +1393,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A res, err := s.TaskPoller. PollWorkflowTask(&workflowservice.PollWorkflowTaskQueueRequest{TaskQueue: tv.StickyTaskQueue()}). HandleTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { // This WT contains partial history because sticky was enabled. s.EqualHistory(` 4 WorkflowTaskCompleted @@ -1406,15 +1406,14 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A s.Equal(tv.HandlerName(), updRequest.GetInput().GetName()) s.EqualValues(5, updRequestMsg.GetEventId()) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Commands: s.UpdateAcceptCompleteCommands(tv, "1"), Messages: s.UpdateAcceptCompleteMessages(tv, updRequestMsg, "1"), - }}, nil + }, nil }) require.NoError(s.T(), err) require.NotNil(s.T(), res) - require.NotNil(s.T(), res.WorkflowTaskCompletedResponse) - require.EqualValues(s.T(), 0, res.WorkflowTaskCompletedResponse.ResetHistoryEventId) + require.EqualValues(s.T(), 0, res.ResetHistoryEventId) }() // This is to make sure that sticky poller above reached server first. @@ -1627,7 +1626,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re // Process update in workflow. res, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -1644,14 +1643,14 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re s.Equal(tv.HandlerName(), updRequest.GetInput().GetName()) s.EqualValues(5, updRequestMsg.GetEventId()) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateRejectMessages(tv, updRequestMsg, "1"), - }}, nil + }, nil }) s.NoError(err) updateResult := <-updateResultCh s.Equal("rejection-of-"+tv.UpdateID("1"), updateResult.GetOutcome().GetFailure().GetMessage()) - s.EqualValues(3, res.WorkflowTaskCompletedResponse.ResetHistoryEventId) + s.EqualValues(3, res.ResetHistoryEventId) // Send signal to create WT. err = s.SendSignal(s.Namespace(), tv.WorkflowExecution(), tv.Any().String(), tv.Any().Payloads(), tv.Any().String()) @@ -1659,7 +1658,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re // Process signal and complete workflow. res, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -1669,7 +1668,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re 6 WorkflowTaskScheduled 7 WorkflowTaskStarted`, task.History) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Commands: []*commandpb.Command{ { CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, @@ -1678,7 +1677,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re }, }, }, - }}, nil + }, nil }) s.NoError(err) s.NotNil(res) @@ -2210,7 +2209,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() // Try to accept update in workflow: get malformed response. _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -2221,7 +2220,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() `, task.History) updRequestMsg := task.Messages[0] - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Commands: s.UpdateAcceptCommands(tv, "1"), // Emulate bug in worker/SDK update handler code. Return malformed acceptance response. Messages: []*protocolpb.Message{ @@ -2236,7 +2235,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() }), }, }, - }}, nil + }, nil }) s.Error(err) s.Contains(err.Error(), "wasn't found") @@ -2246,7 +2245,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() // Try to accept update in workflow 2nd time: get error. Poller will fail WT. _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { // 2nd attempt has same updates attached to it. updRequestMsg := task.Messages[0] s.EqualValues(8, updRequestMsg.GetEventId()) @@ -2266,7 +2265,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() // Try to accept update in workflow 3rd time: get error. Poller will fail WT. _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { // 3rd attempt UpdateWorkflowExecution call has timed out but the // update is still running updRequestMsg := task.Messages[0] @@ -2280,7 +2279,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() // Complete workflow. _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -2292,14 +2291,14 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() 8 WorkflowTaskScheduled 9 WorkflowTaskStarted`, task.History) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Commands: []*commandpb.Command{ { CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}}, }, }, - }}, nil + }, nil }) s.NoError(err) @@ -2694,10 +2693,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Schedul // Drain first WT and respond with sticky enabled response to enable sticky task queue. stickyScheduleToStartTimeout := 1 * time.Second _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ StickyAttributes: tv.StickyExecutionAttributes(stickyScheduleToStartTimeout), - }}, nil + }, nil }) s.NoError(err) @@ -2709,7 +2708,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Schedul // Try to process update in workflow, poll from normal task queue. res, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { // Speculative WFT timed out on sticky task queue. Server sent full history with sticky timeout event. s.EqualHistory(` 1 WorkflowExecutionStarted @@ -2722,9 +2721,9 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Schedul 8 WorkflowTaskStarted`, task.History) // Reject update, but WFT will still be in the history due to timeout on sticky queue. - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateRejectMessages(tv, task.Messages[0], "1"), - }}, nil + }, nil }) s.NoError(err) s.NotNil(res) @@ -3238,7 +3237,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbe // Heartbeat from speculative WT (no messages, no commands). var updRequestMsg *protocolpb.Message res, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -3252,17 +3251,17 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbe updRequestMsg = task.Messages[0] s.EqualValues(5, updRequestMsg.GetEventId()) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &workflowservice.RespondWorkflowTaskCompletedRequest{ ReturnNewWorkflowTask: true, ForceCreateNewWorkflowTask: true, - }}, nil + }, nil }) s.NoError(err) // Reject update from workflow. updateResp, err := s.TaskPoller.HandleWorkflowTask(tv, - res.WorkflowTaskCompletedResponse.GetWorkflowTask(), - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + res.GetWorkflowTask(), + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.EqualHistory(` 7 WorkflowTaskCompleted 8 WorkflowTaskScheduled // New WT (after heartbeat) is normal and won't disappear from the history after reject. @@ -3271,17 +3270,16 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbe s.Empty(task.Messages) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateRejectMessages(tv, updRequestMsg, "1"), - }}, nil + }, nil }) s.NoError(err) s.NotNil(updateResp) - s.NotNil(updateResp.WorkflowTaskCompletedResponse) updateResult := <-updateResultCh s.Equal("rejection-of-"+tv.UpdateID("1"), updateResult.GetOutcome().GetFailure().GetMessage()) - s.EqualValues(0, updateResp.WorkflowTaskCompletedResponse.ResetHistoryEventId, "no reset of event ID should happened after update rejection because of heartbeat") + s.EqualValues(0, updateResp.ResetHistoryEventId, "no reset of event ID should happened after update rejection because of heartbeat") events := s.GetHistory(s.Namespace(), tv.WorkflowExecution()) @@ -4946,10 +4944,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { uwsCh := sendUpdateWithStart(testcore.NewContext(), startReq, updateReq) _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0], "1"), - }}, nil + }, nil }) s.NoError(err) @@ -4984,10 +4982,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { uwsCh := sendUpdateWithStart(testcore.NewContext(), startReq, updateReq) _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateRejectMessages(tv, task.Messages[0], "1"), - }}, nil + }, nil }) s.NoError(err) @@ -5036,10 +5034,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { uwsCh := sendUpdateWithStart(testcore.NewContext(), startReq, updateReq) _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0], "1"), - }}, nil + }, nil }) s.NoError(err) @@ -5086,10 +5084,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { uwsCh := sendUpdateWithStart(testcore.NewContext(), startReq, updateReq) _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateRejectMessages(tv, task.Messages[0], "1"), - }}, nil + }, nil }) s.NoError(err) @@ -5134,10 +5132,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { s.NoError(err) _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0], "1"), - }}, nil + }, nil }) s.NoError(err) @@ -5210,10 +5208,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { // 1st uwsCh1 := sendUpdateWithStart(testcore.NewContext(), startReq, updReq) _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0], "1"), - }}, nil + }, nil }) s.NoError(err) uwsRes1 := <-uwsCh1 @@ -5242,7 +5240,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { tv := testvars.New(s.T()) // start and terminate workflow - _, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), startWorkflowReq(tv)) + initialWorkflow, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), startWorkflowReq(tv)) s.NoError(err) _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, taskpoller.DrainWorkflowTask) @@ -5264,10 +5262,10 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { uwsCh := sendUpdateWithStart(testcore.NewContext(), startReq, updateReq) _, err = s.TaskPoller.PollAndHandleWorkflowTask(tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0], "1"), - }}, nil + }, nil }) s.NoError(err) @@ -5278,6 +5276,13 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { s.True(startResp.Started) s.EqualValues("success-result-of-"+tv.UpdateID("1"), testcore.DecodeString(s.T(), updateRep.GetOutcome().GetSuccess())) + // ensure terminated workflow is not locked by update-with-start + err = s.SendSignal(s.Namespace(), &commonpb.WorkflowExecution{ + WorkflowId: tv.WorkflowID(), + RunId: initialWorkflow.RunId, + }, tv.Any().String(), tv.Any().Payloads(), tv.Any().String()) + s.ErrorContains(err, "workflow execution already completed") + // poll update to ensure same outcome is returned pollRes, err := s.pollUpdate(tv, "1", &updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED}) @@ -5338,7 +5343,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() { errs := err.(*serviceerror.MultiOperationExecution).OperationErrors() s.Len(errs, 2) s.Equal("Operation was aborted.", errs[0].Error()) - s.Contains(errs[1].Error(), "limit on number of total updates has been reached") + s.Contains(errs[1].Error(), "limit on the total number of distinct updates in this workflow has been reached") }) } diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 0191d49fa40..baefb32d38c 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -179,15 +179,14 @@ func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) { tv := testvars.New(s) if sticky { - s.warmUpSticky(tv) + s.warmUpSticky(tv, sticky) } wftCompleted := make(chan interface{}) s.pollWftAndHandle(tv, false, wftCompleted, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondWftWithActivities(tv, tv, - sticky, vbUnpinned, "5")}, nil + return respondWftWithActivities(tv, tv, sticky, vbUnpinned, "5"), nil }) actCompleted := make(chan interface{}) @@ -210,9 +209,9 @@ func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) { s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) s.pollWftAndHandle(tv, sticky, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tv, vbUnpinned)}, nil + return respondCompleteWorkflow(tv, vbUnpinned), nil }) s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) } @@ -237,14 +236,14 @@ func (s *Versioning3Suite) testQueryWithPinnedOverride(sticky bool) { tv := testvars.New(s) if sticky { - s.warmUpSticky(tv) + s.warmUpSticky(tv, sticky) } wftCompleted := make(chan interface{}) s.pollWftAndHandle(tv, false, wftCompleted, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondEmptyWft(tv, sticky, vbUnpinned)}, nil + return respondEmptyWft(tv, sticky, vbUnpinned), nil }) override := makePinnedOverride(tv.Deployment()) @@ -260,11 +259,7 @@ func (s *Versioning3Suite) testQueryWithPinnedOverride(sticky bool) { } func (s *Versioning3Suite) TestUnpinnedQuery_NoSticky() { - s.RunTestWithMatchingBehavior( - func() { - s.testUnpinnedQuery(false) - }, - ) + s.testUnpinnedQuery(false) } func (s *Versioning3Suite) TestUnpinnedQuery_Sticky() { @@ -281,15 +276,15 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { d := tv.Deployment() if sticky { - s.warmUpSticky(tv) + s.warmUpSticky(tv, sticky) } wftCompleted := make(chan interface{}) s.pollWftAndHandle(tv, false, wftCompleted, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, transitionTo(d)) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondEmptyWft(tv, sticky, vbUnpinned)}, nil + return respondEmptyWft(tv, sticky, vbUnpinned), nil }) s.setCurrentDeployment(d) @@ -303,13 +298,13 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) } - go s.idlePollWorkflow(tvB, sticky, ver3MinPollTime, "new deployment should not receive query") + go s.idlePollWorkflowWhileHandlingQueries(tvB, sticky, ver3MinPollTime, "new deployment should not receive query") s.pollAndQueryWorkflow(tv, sticky) // redirect query to new deployment s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) - go s.idlePollWorkflow(tv, sticky, ver3MinPollTime, "old deployment should not receive query") + go s.idlePollWorkflowWhileHandlingQueries(tv, sticky, ver3MinPollTime, "old deployment should not receive query") s.pollAndQueryWorkflow(tvB, sticky) } @@ -319,11 +314,11 @@ func (s *Versioning3Suite) pollAndQueryWorkflow( sticky bool, ) { queryResultCh := make(chan interface{}) - s.pollWftAndHandle(tv, sticky, queryResultCh, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + s.pollWftAndHandleQueries(tv, sticky, queryResultCh, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { s.NotNil(task) s.NotNil(task.Query) - return &taskpoller.TaskCompletedRequest{QueryTaskCompletedRequest: respondQueryTaskCompleted(task, s.Namespace())}, nil + return respondQueryTaskCompleted(task, s.Namespace()), nil }) response, err := s.queryWorkflow(tv) @@ -354,16 +349,15 @@ func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) { d := tv.Deployment() if sticky { - s.warmUpSticky(tv) + s.warmUpSticky(tv, sticky) } wftCompleted := make(chan interface{}) s.pollWftAndHandle(tv, false, wftCompleted, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, transitionTo(d)) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondWftWithActivities(tv, tv, sticky, - vbUnpinned, "5")}, nil + return respondWftWithActivities(tv, tv, sticky, vbUnpinned, "5"), nil }) actCompleted := make(chan interface{}) @@ -388,10 +382,9 @@ func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) { s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) s.pollWftAndHandle(tv, sticky, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tv, - vbUnpinned)}, nil + return respondCompleteWorkflow(tv, vbUnpinned), nil }) s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) } @@ -414,18 +407,17 @@ func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { dB := tvB.Deployment() if sticky { - s.warmUpSticky(tvA) + s.warmUpSticky(tvA, sticky) } s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) we := s.startWorkflow(tvA, nil) s.pollWftAndHandle(tvA, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tvA, vbUnspecified, nil, nil, transitionTo(dA)) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondWftWithActivities(tvA, tvA, - sticky, vbUnpinned, "5")}, nil + return respondWftWithActivities(tvA, tvA, sticky, vbUnpinned, "5"), nil }) s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) if sticky { @@ -443,11 +435,10 @@ func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) s.pollWftAndHandle(tvB, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, transitionTo(dB)) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tvB, - vbUnpinned)}, nil + return respondCompleteWorkflow(tvB, vbUnpinned), nil }) s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) } @@ -523,20 +514,18 @@ func (s *Versioning3Suite) TestEagerActivity() { s.startWorkflow(tv, nil) poller, resp := s.pollWftAndHandle(tv, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, transitionTo(d)) resp := respondWftWithActivities(tv, tv, true, vbUnpinned, "5") resp.Commands[0].GetScheduleActivityTaskCommandAttributes().RequestEagerExecution = true - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: resp}, nil + return resp, nil }) s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) - s.NotEmpty(resp) - s.NotEmpty(resp.WorkflowTaskCompletedResponse) - s.NotEmpty(resp.WorkflowTaskCompletedResponse.GetActivityTasks()) + s.NotEmpty(resp.GetActivityTasks()) - _, err := poller.HandleActivityTask(tv, resp.WorkflowTaskCompletedResponse.GetActivityTasks()[0], + _, err := poller.HandleActivityTask(tv, resp.GetActivityTasks()[0], func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { s.NotNil(task) return respondActivity(), nil @@ -545,9 +534,9 @@ func (s *Versioning3Suite) TestEagerActivity() { s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) s.pollWftAndHandle(tv, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tv, vbUnpinned)}, nil + return respondCompleteWorkflow(tv, vbUnpinned), nil }) s.verifyWorkflowVersioning(tv, vbUnpinned, d, nil, nil) } @@ -580,18 +569,17 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { dB := tvB.Deployment() if sticky { - s.warmUpSticky(tvA) + s.warmUpSticky(tvA, sticky) } s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) we := s.startWorkflow(tvA, nil) s.pollWftAndHandle(tvA, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tvA, vbUnspecified, nil, nil, transitionTo(dA)) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondWftWithActivities(tvA, tvA, - sticky, vbUnpinned, "5", "6", "7", "8")}, nil + return respondWftWithActivities(tvA, tvA, sticky, vbUnpinned, "5", "6", "7", "8"), nil }) s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) if sticky { @@ -665,7 +653,7 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { // 5. The transition should create a new WFT to be sent to dB. s.pollWftAndHandle(tvB, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, transitionTo(dB)) close(transitionStarted) @@ -675,8 +663,7 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { <-act2Failed transitionCompleted.Store(true) s.Logger.Info("Transition wft completed") - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondEmptyWft(tvB, - sticky, vbUnpinned)}, nil + return respondEmptyWft(tvB, sticky, vbUnpinned), nil }) s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) if sticky { @@ -686,11 +673,10 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { // 9. Now all activities should complete. <-act2To4Completed s.pollWftAndHandle(tvB, sticky, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) s.Logger.Info("Final wft completed") - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tvB, - vbUnpinned)}, nil + return respondCompleteWorkflow(tvB, vbUnpinned), nil }) s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) } @@ -723,12 +709,11 @@ func (s *Versioning3Suite) testIndependentActivity(behavior enumspb.VersioningBe s.startWorkflow(tvWf, nil) s.pollWftAndHandle(tvWf, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) s.verifyWorkflowVersioning(tvWf, vbUnspecified, nil, nil, transitionTo(dWf)) s.Logger.Info("First wf task completed") - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondWftWithActivities(tvWf, tvAct, - false, behavior, "5")}, nil + return respondWftWithActivities(tvWf, tvAct, false, behavior, "5"), nil }) s.verifyWorkflowVersioning(tvWf, behavior, dWf, nil, nil) @@ -741,10 +726,9 @@ func (s *Versioning3Suite) testIndependentActivity(behavior enumspb.VersioningBe s.verifyWorkflowVersioning(tvWf, behavior, dWf, nil, nil) s.pollWftAndHandle(tvWf, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) - return &taskpoller.TaskCompletedRequest{WorkflowTaskCompletedRequest: respondCompleteWorkflow(tvWf, - behavior)}, nil + return respondCompleteWorkflow(tvWf, behavior), nil }) s.verifyWorkflowVersioning(tvWf, behavior, dWf, nil, nil) } @@ -1025,15 +1009,15 @@ func (s *Versioning3Suite) pollWftAndHandle( tv *testvars.TestVars, sticky bool, async chan<- interface{}, - handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error), -) (*taskpoller.TaskPoller, *taskpoller.TaskCompletedResponse) { + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), +) (*taskpoller.TaskPoller, *workflowservice.RespondWorkflowTaskCompletedResponse) { poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) d := tv.Deployment() tq := tv.TaskQueue() if sticky { tq = tv.StickyTaskQueue() } - f := func() *taskpoller.TaskCompletedResponse { + f := func() *workflowservice.RespondWorkflowTaskCompletedResponse { resp, err := poller.PollWorkflowTask( &workflowservice.PollWorkflowTaskQueueRequest{ WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ @@ -1058,6 +1042,43 @@ func (s *Versioning3Suite) pollWftAndHandle( return nil, nil } +func (s *Versioning3Suite) pollWftAndHandleQueries( + tv *testvars.TestVars, + sticky bool, + async chan<- interface{}, + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error), +) (*taskpoller.TaskPoller, *workflowservice.RespondQueryTaskCompletedResponse) { + poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) + d := tv.Deployment() + tq := tv.TaskQueue() + if sticky { + tq = tv.StickyTaskQueue() + } + f := func() *workflowservice.RespondQueryTaskCompletedResponse { + resp, err := poller.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{ + WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: d.BuildId, + DeploymentSeriesName: d.SeriesName, + UseVersioning: true, + }, + TaskQueue: tq, + }, + ).HandleQueries(tv, handler) + s.NoError(err) + return resp + } + if async == nil { + return poller, f() + } else { + go func() { + f() + close(async) + }() + } + return nil, nil +} + func (s *Versioning3Suite) pollNexusTaskAndHandle( tv *testvars.TestVars, sticky bool, @@ -1141,7 +1162,33 @@ func (s *Versioning3Suite) idlePollWorkflow( }, ).HandleTask( tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.Fail(unexpectedTaskMessage) + return nil, nil + }, + taskpoller.WithTimeout(timeout), + ) +} + +func (s *Versioning3Suite) idlePollWorkflowWhileHandlingQueries( + tv *testvars.TestVars, + versioned bool, + timeout time.Duration, + unexpectedTaskMessage string, +) { + poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) + d := tv.Deployment() + _, _ = poller.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{ + WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: d.BuildId, + DeploymentSeriesName: d.SeriesName, + UseVersioning: versioned, + }, + }, + ).HandleQueries( + tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { s.Fail(unexpectedTaskMessage) return nil, nil }, @@ -1223,20 +1270,28 @@ func (s *Versioning3Suite) verifyWorkflowStickyQueue( // create the sticky queue by polling it. func (s *Versioning3Suite) warmUpSticky( tv *testvars.TestVars, + handlerQueries bool, ) { + poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) - _, _ = poller.PollWorkflowTask( + wfTaskPoller := poller.PollWorkflowTask( &workflowservice.PollWorkflowTaskQueueRequest{ TaskQueue: tv.StickyTaskQueue(), }, - ).HandleTask( - tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*taskpoller.TaskCompletedRequest, error) { + ) + + if handlerQueries { + wfTaskPoller.HandleQueries(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { s.Fail("sticky task is not expected") return nil, nil - }, - taskpoller.WithTimeout(ver3MinPollTime), - ) + }) + } else { + wfTaskPoller.HandleTask(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.Fail("sticky task is not expected") + return nil, nil + }) + } + } func (s *Versioning3Suite) waitForDeploymentDataPropagation( From 61742a4cc47c0afd1dcab90ceba8bfbf30dd838d Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Mon, 6 Jan 2025 16:15:23 -0500 Subject: [PATCH 08/18] removing non-required comments --- common/testing/taskpoller/taskpoller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index 0740b7d36b2..f8b2bd87462 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -441,7 +441,6 @@ func (p *workflowTaskPoller) pollAndHandleTask( if err != nil { return nil, fmt.Errorf("failed to poll workflow task: %w", err) } - // could maybe have type assertions? return p.handleTask(ctx, opts, task, handler) } From 0750ef071a0bf78ecd83388fe5d2efa06341f76f Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Mon, 6 Jan 2025 16:23:54 -0500 Subject: [PATCH 09/18] lint errors --- tests/versioning_3_test.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index baefb32d38c..33d2f784213 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -1070,12 +1070,11 @@ func (s *Versioning3Suite) pollWftAndHandleQueries( } if async == nil { return poller, f() - } else { - go func() { - f() - close(async) - }() } + go func() { + f() + close(async) + }() return nil, nil } @@ -1281,12 +1280,12 @@ func (s *Versioning3Suite) warmUpSticky( ) if handlerQueries { - wfTaskPoller.HandleQueries(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { + _, _ = wfTaskPoller.HandleQueries(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { s.Fail("sticky task is not expected") return nil, nil }) } else { - wfTaskPoller.HandleTask(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + _, _ = wfTaskPoller.HandleTask(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.Fail("sticky task is not expected") return nil, nil }) From 9e372617de1ccb85d1009b5643c2b00df77d10c7 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Mon, 6 Jan 2025 16:32:59 -0500 Subject: [PATCH 10/18] better code --- tests/versioning_3_test.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 33d2f784213..c4bcd58e4cf 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -177,9 +177,10 @@ func (s *Versioning3Suite) TestWorkflowWithPinnedOverride_NoSticky() { func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) { tv := testvars.New(s) + handlingQueries := false if sticky { - s.warmUpSticky(tv, sticky) + s.warmUpSticky(tv, handlingQueries) } wftCompleted := make(chan interface{}) @@ -234,9 +235,10 @@ func (s *Versioning3Suite) TestQueryWithPinnedOverride_Sticky() { func (s *Versioning3Suite) testQueryWithPinnedOverride(sticky bool) { tv := testvars.New(s) + handlingQueries := true if sticky { - s.warmUpSticky(tv, sticky) + s.warmUpSticky(tv, handlingQueries) } wftCompleted := make(chan interface{}) @@ -274,9 +276,9 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { tv := testvars.New(s) tvB := tv.WithBuildId("B") d := tv.Deployment() - + handlingQueries := true if sticky { - s.warmUpSticky(tv, sticky) + s.warmUpSticky(tv, handlingQueries) } wftCompleted := make(chan interface{}) @@ -347,9 +349,10 @@ func (s *Versioning3Suite) TestUnpinnedWorkflow_NoSticky() { func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) { tv := testvars.New(s) d := tv.Deployment() + handlingQueries := false if sticky { - s.warmUpSticky(tv, sticky) + s.warmUpSticky(tv, handlingQueries) } wftCompleted := make(chan interface{}) @@ -405,9 +408,10 @@ func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { tvB := tvA.WithBuildId("B") dA := tvA.Deployment() dB := tvB.Deployment() + handlingQueries := false if sticky { - s.warmUpSticky(tvA, sticky) + s.warmUpSticky(tvA, handlingQueries) } s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) @@ -567,9 +571,9 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { tvB := tvA.WithBuildId("B") dA := tvA.Deployment() dB := tvB.Deployment() - + handlingQueries := false if sticky { - s.warmUpSticky(tvA, sticky) + s.warmUpSticky(tvA, handlingQueries) } s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) @@ -1269,7 +1273,7 @@ func (s *Versioning3Suite) verifyWorkflowStickyQueue( // create the sticky queue by polling it. func (s *Versioning3Suite) warmUpSticky( tv *testvars.TestVars, - handlerQueries bool, + handlingQueries bool, ) { poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) @@ -1279,7 +1283,7 @@ func (s *Versioning3Suite) warmUpSticky( }, ) - if handlerQueries { + if handlingQueries { _, _ = wfTaskPoller.HandleQueries(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { s.Fail("sticky task is not expected") return nil, nil From b8540817ea732eb74963920de32fa738fd503abb Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Mon, 6 Jan 2025 16:56:14 -0500 Subject: [PATCH 11/18] removed stale comments --- common/testing/taskpoller/taskpoller.go | 3 + .../history/workflow/mutable_state_impl.go | 1 - tests/versioning_3_test.go | 69 +++++++++---------- 3 files changed, 34 insertions(+), 39 deletions(-) diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index f8b2bd87462..4281538c7b8 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -366,6 +366,9 @@ func (p *workflowTaskPoller) pollTask( } events = history.Events + if len(events) == 0 && req.TaskQueue.GetKind() != enumspb.TASK_QUEUE_KIND_STICKY { + return nil, errors.New("history events are empty") + } nextPageToken := resp.NextPageToken for nextPageToken != nil { diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index b62891cd60b..28ecd1e6140 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -2569,7 +2569,6 @@ func (ms *MutableStateImpl) GetTransientWorkflowTaskInfo( workflowTask *WorkflowTaskInfo, identity string, ) *historyspb.TransientWorkflowTaskInfo { - // (todo-shivam): can we remove this check ms.IsTransientWorkflowTask()? if !ms.IsTransientWorkflowTask() && workflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { return nil } diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index c4bcd58e4cf..94b11dec084 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -113,10 +113,10 @@ func (s *Versioning3Suite) TestPinnedTask_NoProperPoller() { tv := testvars.New(s) other := tv.WithBuildId("other") - go s.idlePollWorkflow(other, true, ver3MinPollTime, "other deployment should not receive pinned task") + go s.idlePollWorkflow(other, true, ver3MinPollTime, "other deployment should not receive pinned task", false) s.startWorkflow(tv, makePinnedOverride(tv.Deployment())) - s.idlePollWorkflow(tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task") + s.idlePollWorkflow(tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task", false) // Sleeping to let the pollers arrive to server before ending the test. time.Sleep(200 * time.Millisecond) //nolint:forbidigo @@ -127,7 +127,7 @@ func (s *Versioning3Suite) TestUnpinnedTask_NonCurrentDeployment() { s.RunTestWithMatchingBehavior( func() { tv := testvars.New(s) - go s.idlePollWorkflow(tv, true, ver3MinPollTime, "non-current versioned poller should not receive unpinned task") + go s.idlePollWorkflow(tv, true, ver3MinPollTime, "non-current versioned poller should not receive unpinned task", false) s.startWorkflow(tv, nil) @@ -152,6 +152,7 @@ func (s *Versioning3Suite) TestUnpinnedTask_OldDeployment() { true, ver3MinPollTime, "old deployment should not receive unpinned task", + false, ) // Sleeping to let the pollers arrive to server before ending the test. time.Sleep(200 * time.Millisecond) //nolint:forbidigo @@ -261,7 +262,11 @@ func (s *Versioning3Suite) testQueryWithPinnedOverride(sticky bool) { } func (s *Versioning3Suite) TestUnpinnedQuery_NoSticky() { - s.testUnpinnedQuery(false) + s.RunTestWithMatchingBehavior( + func() { + s.testUnpinnedQuery(false) + }, + ) } func (s *Versioning3Suite) TestUnpinnedQuery_Sticky() { @@ -300,13 +305,13 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) } - go s.idlePollWorkflowWhileHandlingQueries(tvB, sticky, ver3MinPollTime, "new deployment should not receive query") + go s.idlePollWorkflow(tvB, sticky, ver3MinPollTime, "new deployment should not receive query", handlingQueries) s.pollAndQueryWorkflow(tv, sticky) // redirect query to new deployment s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) - go s.idlePollWorkflowWhileHandlingQueries(tv, sticky, ver3MinPollTime, "old deployment should not receive query") + go s.idlePollWorkflow(tv, sticky, ver3MinPollTime, "old deployment should not receive query", handlingQueries) s.pollAndQueryWorkflow(tvB, sticky) } @@ -1152,10 +1157,11 @@ func (s *Versioning3Suite) idlePollWorkflow( versioned bool, timeout time.Duration, unexpectedTaskMessage string, + handlingQueries bool, ) { poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) d := tv.Deployment() - _, _ = poller.PollWorkflowTask( + taskPoller := poller.PollWorkflowTask( &workflowservice.PollWorkflowTaskQueueRequest{ WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ BuildId: d.BuildId, @@ -1163,40 +1169,27 @@ func (s *Versioning3Suite) idlePollWorkflow( UseVersioning: versioned, }, }, - ).HandleTask( - tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.Fail(unexpectedTaskMessage) - return nil, nil - }, - taskpoller.WithTimeout(timeout), ) -} -func (s *Versioning3Suite) idlePollWorkflowWhileHandlingQueries( - tv *testvars.TestVars, - versioned bool, - timeout time.Duration, - unexpectedTaskMessage string, -) { - poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) - d := tv.Deployment() - _, _ = poller.PollWorkflowTask( - &workflowservice.PollWorkflowTaskQueueRequest{ - WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ - BuildId: d.BuildId, - DeploymentSeriesName: d.SeriesName, - UseVersioning: versioned, + if handlingQueries { + _, _ = taskPoller.HandleQueries( + tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { + s.Fail(unexpectedTaskMessage) + return nil, nil }, - }, - ).HandleQueries( - tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { - s.Fail(unexpectedTaskMessage) - return nil, nil - }, - taskpoller.WithTimeout(timeout), - ) + taskpoller.WithTimeout(timeout), + ) + } else { + _, _ = taskPoller.HandleTask( + tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.Fail(unexpectedTaskMessage) + return nil, nil + }, + taskpoller.WithTimeout(timeout), + ) + } } func (s *Versioning3Suite) idlePollActivity( From 1725f90b5e2c2da32f5e55c9958f52cee0683c8a Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Tue, 7 Jan 2025 12:51:13 -0500 Subject: [PATCH 12/18] addressed all comments --- common/testing/taskpoller/taskpoller.go | 40 +++++++---- tests/versioning_3_test.go | 91 ++++++++----------------- 2 files changed, 56 insertions(+), 75 deletions(-) diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index 4281538c7b8..5ab62b3cb59 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -35,11 +35,13 @@ import ( nexuspb "go.temporal.io/api/nexus/v1" enumspb "go.temporal.io/api/enums/v1" + failurepb "go.temporal.io/api/failure/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/temporal" "go.temporal.io/server/common" "go.temporal.io/server/common/debug" + "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/rpc" "go.temporal.io/server/common/testing/testvars" ) @@ -389,7 +391,7 @@ func (p *workflowTaskPoller) pollTask( return resp, err } -func (p *workflowTaskPoller) HandleQueries( +func (p *workflowTaskPoller) HandleLegacyQuery( tv *testvars.TestVars, handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error), opts ...optionFunc, @@ -398,10 +400,10 @@ func (p *workflowTaskPoller) HandleQueries( options := newOptions(tv, opts) ctx, cancel := newContext(options) defer cancel() - return p.pollAndHandleQueries(ctx, options, handler) + return p.pollAndHandleLegacyQuery(ctx, options, handler) } -func (p *workflowTaskPoller) pollAndHandleQueries( +func (p *workflowTaskPoller) pollAndHandleLegacyQuery( ctx context.Context, opts *options, handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error), @@ -411,22 +413,18 @@ func (p *workflowTaskPoller) pollAndHandleQueries( if err != nil { return nil, fmt.Errorf("failed to poll workflow task: %w", err) } - return p.handleQueries(ctx, opts, task, handler) + return p.handleQuery(ctx, task, handler) } -func (p *workflowTaskPoller) handleQueries( +func (p *workflowTaskPoller) handleQuery( ctx context.Context, - opts *options, task *workflowservice.PollWorkflowTaskQueueResponse, handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error), ) (*workflowservice.RespondQueryTaskCompletedResponse, error) { p.t.Helper() + // if an error is received here it shall be present in RespondQueryTaskCompletedResponse.ErrorMessage reply, err := handler(task) - if err != nil { - return nil, p.respondTaskFailed(ctx, opts, task.TaskToken, err) - } - - resp, err := p.respondQueryTaskCompleted(ctx, task, reply) + resp, err := p.respondQueryTaskCompleted(ctx, task, reply, err) if err != nil { return nil, err } @@ -471,20 +469,36 @@ func (p *workflowTaskPoller) respondQueryTaskCompleted( ctx context.Context, task *workflowservice.PollWorkflowTaskQueueResponse, reply *workflowservice.RespondQueryTaskCompletedRequest, + err error, ) (*workflowservice.RespondQueryTaskCompletedResponse, error) { p.t.Helper() if task == nil { - return nil, errors.New("missing PollWorkflowTaskQueue") + return nil, errors.New("missing PollWorkflowTaskQueueResponse") + } + if task.Query == nil { + return nil, errors.New("missing Legacy Query in PollWorkflowTaskQueueResponse") } if reply == nil { - return nil, errors.New("missing RespondQueryTaskCompleted return") + return nil, errors.New("missing RespondQueryTaskCompletedRequest") } + + // setting the fields for RespondQueryTaskCompletedResponse if reply.Namespace == "" { reply.Namespace = p.namespace } if reply.TaskToken == nil { reply.TaskToken = task.TaskToken } + if err != nil { + reply.ErrorMessage = err.Error() + reply.Failure = &failurepb.Failure{ + Message: err.Error(), + } + reply.CompletedType = enumspb.QUERY_RESULT_TYPE_FAILED + } + reply.CompletedType = enumspb.QUERY_RESULT_TYPE_ANSWERED + reply.QueryResult = payloads.EncodeString("query-result") + resp, err := p.client.RespondQueryTaskCompleted(ctx, reply) if err != nil { return nil, fmt.Errorf("failed to respond with respondQueryTaskCompleted: %w", err) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 94b11dec084..5393057c8da 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -113,10 +113,10 @@ func (s *Versioning3Suite) TestPinnedTask_NoProperPoller() { tv := testvars.New(s) other := tv.WithBuildId("other") - go s.idlePollWorkflow(other, true, ver3MinPollTime, "other deployment should not receive pinned task", false) + go s.idlePollWorkflow(other, true, ver3MinPollTime, "other deployment should not receive pinned task") s.startWorkflow(tv, makePinnedOverride(tv.Deployment())) - s.idlePollWorkflow(tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task", false) + s.idlePollWorkflow(tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task") // Sleeping to let the pollers arrive to server before ending the test. time.Sleep(200 * time.Millisecond) //nolint:forbidigo @@ -127,7 +127,7 @@ func (s *Versioning3Suite) TestUnpinnedTask_NonCurrentDeployment() { s.RunTestWithMatchingBehavior( func() { tv := testvars.New(s) - go s.idlePollWorkflow(tv, true, ver3MinPollTime, "non-current versioned poller should not receive unpinned task", false) + go s.idlePollWorkflow(tv, true, ver3MinPollTime, "non-current versioned poller should not receive unpinned task") s.startWorkflow(tv, nil) @@ -152,7 +152,6 @@ func (s *Versioning3Suite) TestUnpinnedTask_OldDeployment() { true, ver3MinPollTime, "old deployment should not receive unpinned task", - false, ) // Sleeping to let the pollers arrive to server before ending the test. time.Sleep(200 * time.Millisecond) //nolint:forbidigo @@ -178,10 +177,9 @@ func (s *Versioning3Suite) TestWorkflowWithPinnedOverride_NoSticky() { func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) { tv := testvars.New(s) - handlingQueries := false if sticky { - s.warmUpSticky(tv, handlingQueries) + s.warmUpSticky(tv) } wftCompleted := make(chan interface{}) @@ -236,10 +234,9 @@ func (s *Versioning3Suite) TestQueryWithPinnedOverride_Sticky() { func (s *Versioning3Suite) testQueryWithPinnedOverride(sticky bool) { tv := testvars.New(s) - handlingQueries := true if sticky { - s.warmUpSticky(tv, handlingQueries) + s.warmUpSticky(tv) } wftCompleted := make(chan interface{}) @@ -281,9 +278,8 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { tv := testvars.New(s) tvB := tv.WithBuildId("B") d := tv.Deployment() - handlingQueries := true if sticky { - s.warmUpSticky(tv, handlingQueries) + s.warmUpSticky(tv) } wftCompleted := make(chan interface{}) @@ -305,13 +301,13 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) } - go s.idlePollWorkflow(tvB, sticky, ver3MinPollTime, "new deployment should not receive query", handlingQueries) + go s.idlePollWorkflow(tvB, sticky, ver3MinPollTime, "new deployment should not receive query") s.pollAndQueryWorkflow(tv, sticky) // redirect query to new deployment s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) - go s.idlePollWorkflow(tv, sticky, ver3MinPollTime, "old deployment should not receive query", handlingQueries) + go s.idlePollWorkflow(tv, sticky, ver3MinPollTime, "old deployment should not receive query") s.pollAndQueryWorkflow(tvB, sticky) } @@ -323,14 +319,11 @@ func (s *Versioning3Suite) pollAndQueryWorkflow( queryResultCh := make(chan interface{}) s.pollWftAndHandleQueries(tv, sticky, queryResultCh, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { - s.NotNil(task) - s.NotNil(task.Query) - return respondQueryTaskCompleted(task, s.Namespace()), nil + return &workflowservice.RespondQueryTaskCompletedRequest{}, nil }) - response, err := s.queryWorkflow(tv) - s.Error(err) - s.Nil(response) + _, err := s.queryWorkflow(tv) + s.NoError(err) <-queryResultCh } @@ -354,10 +347,9 @@ func (s *Versioning3Suite) TestUnpinnedWorkflow_NoSticky() { func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) { tv := testvars.New(s) d := tv.Deployment() - handlingQueries := false if sticky { - s.warmUpSticky(tv, handlingQueries) + s.warmUpSticky(tv) } wftCompleted := make(chan interface{}) @@ -413,10 +405,9 @@ func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { tvB := tvA.WithBuildId("B") dA := tvA.Deployment() dB := tvB.Deployment() - handlingQueries := false if sticky { - s.warmUpSticky(tvA, handlingQueries) + s.warmUpSticky(tvA) } s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) @@ -576,9 +567,8 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { tvB := tvA.WithBuildId("B") dA := tvA.Deployment() dB := tvB.Deployment() - handlingQueries := false if sticky { - s.warmUpSticky(tvA, handlingQueries) + s.warmUpSticky(tvA) } s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) @@ -1073,7 +1063,7 @@ func (s *Versioning3Suite) pollWftAndHandleQueries( }, TaskQueue: tq, }, - ).HandleQueries(tv, handler) + ).HandleLegacyQuery(tv, handler) s.NoError(err) return resp } @@ -1157,11 +1147,10 @@ func (s *Versioning3Suite) idlePollWorkflow( versioned bool, timeout time.Duration, unexpectedTaskMessage string, - handlingQueries bool, ) { poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) d := tv.Deployment() - taskPoller := poller.PollWorkflowTask( + _, _ = poller.PollWorkflowTask( &workflowservice.PollWorkflowTaskQueueRequest{ WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ BuildId: d.BuildId, @@ -1169,27 +1158,14 @@ func (s *Versioning3Suite) idlePollWorkflow( UseVersioning: versioned, }, }, + ).HandleTask( + tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.Fail(unexpectedTaskMessage) + return nil, nil + }, + taskpoller.WithTimeout(timeout), ) - - if handlingQueries { - _, _ = taskPoller.HandleQueries( - tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { - s.Fail(unexpectedTaskMessage) - return nil, nil - }, - taskpoller.WithTimeout(timeout), - ) - } else { - _, _ = taskPoller.HandleTask( - tv, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.Fail(unexpectedTaskMessage) - return nil, nil - }, - taskpoller.WithTimeout(timeout), - ) - } } func (s *Versioning3Suite) idlePollActivity( @@ -1266,28 +1242,19 @@ func (s *Versioning3Suite) verifyWorkflowStickyQueue( // create the sticky queue by polling it. func (s *Versioning3Suite) warmUpSticky( tv *testvars.TestVars, - handlingQueries bool, ) { poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) - wfTaskPoller := poller.PollWorkflowTask( + _, _ = poller.PollWorkflowTask( &workflowservice.PollWorkflowTaskQueueRequest{ TaskQueue: tv.StickyTaskQueue(), }, + ).HandleTask(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.Fail("sticky task is not expected") + return nil, nil + }, + taskpoller.WithTimeout(ver3MinPollTime), ) - - if handlingQueries { - _, _ = wfTaskPoller.HandleQueries(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error) { - s.Fail("sticky task is not expected") - return nil, nil - }) - } else { - _, _ = wfTaskPoller.HandleTask(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.Fail("sticky task is not expected") - return nil, nil - }) - } - } func (s *Versioning3Suite) waitForDeploymentDataPropagation( From 6b96b98a58f98d2ab4411fc7a5cb91bf1a93daf8 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Tue, 7 Jan 2025 12:54:44 -0500 Subject: [PATCH 13/18] restoring old code --- tests/versioning_3_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 5393057c8da..1981726a0e2 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -405,7 +405,6 @@ func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { tvB := tvA.WithBuildId("B") dA := tvA.Deployment() dB := tvB.Deployment() - if sticky { s.warmUpSticky(tvA) } @@ -1243,16 +1242,17 @@ func (s *Versioning3Suite) verifyWorkflowStickyQueue( func (s *Versioning3Suite) warmUpSticky( tv *testvars.TestVars, ) { - poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) _, _ = poller.PollWorkflowTask( &workflowservice.PollWorkflowTaskQueueRequest{ TaskQueue: tv.StickyTaskQueue(), }, - ).HandleTask(tv, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.Fail("sticky task is not expected") - return nil, nil - }, + ).HandleTask( + tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.Fail("sticky task is not expected") + return nil, nil + }, taskpoller.WithTimeout(ver3MinPollTime), ) } From 6bd2fdc28c1cc21f85197ff414145b4d021b2e62 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Tue, 7 Jan 2025 14:08:27 -0500 Subject: [PATCH 14/18] Fixed errors --- common/testing/taskpoller/taskpoller.go | 10 ++++++++-- tests/versioning_3_test.go | 15 ++------------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index 5ab62b3cb59..238becfc0ce 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -489,15 +489,21 @@ func (p *workflowTaskPoller) respondQueryTaskCompleted( if reply.TaskToken == nil { reply.TaskToken = task.TaskToken } + + // setting CompletedType based on error if err != nil { reply.ErrorMessage = err.Error() reply.Failure = &failurepb.Failure{ Message: err.Error(), } reply.CompletedType = enumspb.QUERY_RESULT_TYPE_FAILED + } else { + reply.CompletedType = enumspb.QUERY_RESULT_TYPE_ANSWERED + } + + if reply.QueryResult == nil { + reply.QueryResult = payloads.EncodeString("query-result") } - reply.CompletedType = enumspb.QUERY_RESULT_TYPE_ANSWERED - reply.QueryResult = payloads.EncodeString("query-result") resp, err := p.client.RespondQueryTaskCompleted(ctx, reply) if err != nil { diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 1981726a0e2..c9dc6e71cc6 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -301,13 +301,13 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) } - go s.idlePollWorkflow(tvB, sticky, ver3MinPollTime, "new deployment should not receive query") + go s.idlePollWorkflow(tvB, true, ver3MinPollTime, "new deployment should not receive query") s.pollAndQueryWorkflow(tv, sticky) // redirect query to new deployment s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) - go s.idlePollWorkflow(tv, sticky, ver3MinPollTime, "old deployment should not receive query") + go s.idlePollWorkflow(tv, true, ver3MinPollTime, "old deployment should not receive query") s.pollAndQueryWorkflow(tvB, sticky) } @@ -930,17 +930,6 @@ func mustToPayload(v any) *commonpb.Payload { return payload } -func respondQueryTaskCompleted( - task *workflowservice.PollWorkflowTaskQueueResponse, - namespace string, -) *workflowservice.RespondQueryTaskCompletedRequest { - request := &workflowservice.RespondQueryTaskCompletedRequest{ - Namespace: namespace, - TaskToken: task.TaskToken, - } - return request -} - func (s *Versioning3Suite) startWorkflow( tv *testvars.TestVars, override *workflowpb.VersioningOverride, From 966a0750a91ff7a246e8f1c4cc236f0449e1e299 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Tue, 7 Jan 2025 14:11:27 -0500 Subject: [PATCH 15/18] Setting query result on a nil error --- common/testing/taskpoller/taskpoller.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index 238becfc0ce..23c44d7b365 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -490,7 +490,6 @@ func (p *workflowTaskPoller) respondQueryTaskCompleted( reply.TaskToken = task.TaskToken } - // setting CompletedType based on error if err != nil { reply.ErrorMessage = err.Error() reply.Failure = &failurepb.Failure{ @@ -499,10 +498,9 @@ func (p *workflowTaskPoller) respondQueryTaskCompleted( reply.CompletedType = enumspb.QUERY_RESULT_TYPE_FAILED } else { reply.CompletedType = enumspb.QUERY_RESULT_TYPE_ANSWERED - } - - if reply.QueryResult == nil { - reply.QueryResult = payloads.EncodeString("query-result") + if reply.QueryResult == nil { + reply.QueryResult = payloads.EncodeString("query-result") + } } resp, err := p.client.RespondQueryTaskCompleted(ctx, reply) From dba609de8ad503a5f24fc7066774b7e18e4abd86 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Tue, 7 Jan 2025 22:32:38 -0500 Subject: [PATCH 16/18] updating tests --- tests/versioning_3_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index c9dc6e71cc6..f6e02c4d6f7 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -303,12 +303,14 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { go s.idlePollWorkflow(tvB, true, ver3MinPollTime, "new deployment should not receive query") s.pollAndQueryWorkflow(tv, sticky) + time.Sleep(ver3MinPollTime) // so that the old idle poller does not interfere with the next poller // redirect query to new deployment s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) go s.idlePollWorkflow(tv, true, ver3MinPollTime, "old deployment should not receive query") - s.pollAndQueryWorkflow(tvB, sticky) + // Since the current deployment has changed, task moves to the normal queue + s.pollAndQueryWorkflow(tvB, false) } From 9bf96e342ab8ea750bcd4368901ebced2dcad503 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Tue, 7 Jan 2025 22:34:01 -0500 Subject: [PATCH 17/18] quieting the lint --- tests/versioning_3_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index f6e02c4d6f7..a7d172721cf 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -303,7 +303,7 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { go s.idlePollWorkflow(tvB, true, ver3MinPollTime, "new deployment should not receive query") s.pollAndQueryWorkflow(tv, sticky) - time.Sleep(ver3MinPollTime) // so that the old idle poller does not interfere with the next poller + time.Sleep(ver3MinPollTime) // //nolint:forbidigo // redirect query to new deployment s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) From 114a87d55bc65c354ee5aa005e1aede6e0b6b7e3 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Wed, 8 Jan 2025 10:08:50 -0500 Subject: [PATCH 18/18] removing time.Sleep from tests --- tests/versioning_3_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index a7d172721cf..8b3a237942c 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -301,15 +301,19 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) } - go s.idlePollWorkflow(tvB, true, ver3MinPollTime, "new deployment should not receive query") + pollerDone := make(chan interface{}) + go func() { + s.idlePollWorkflow(tvB, true, ver3MinPollTime, "new deployment should not receive query") + close(pollerDone) + }() s.pollAndQueryWorkflow(tv, sticky) - time.Sleep(ver3MinPollTime) // //nolint:forbidigo + <-pollerDone // wait for the idle poller to complete to not interfere with the next poller // redirect query to new deployment s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) go s.idlePollWorkflow(tv, true, ver3MinPollTime, "old deployment should not receive query") - // Since the current deployment has changed, task moves to the normal queue + // Since the current deployment has changed, task will move to the normal queue (thus, sticky=false) s.pollAndQueryWorkflow(tvB, false) }