Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

functional tests testing queries + nexus tasks with versioning-3 #7015

Merged
merged 19 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 182 additions & 17 deletions common/testing/taskpoller/taskpoller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
"testing"
"time"

"github.com/nexus-rpc/sdk-go/nexus"
nexuspb "go.temporal.io/api/nexus/v1"

enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/workflowservice/v1"
Expand All @@ -55,17 +58,29 @@ type (
*TaskPoller
pollActivityTaskRequest *workflowservice.PollActivityTaskQueueRequest
}
nexusTaskPoller struct {
*TaskPoller
pollNexusTaskRequest *workflowservice.PollNexusTaskQueueRequest
}
options struct {
tv *testvars.TestVars
timeout time.Duration
}
optionFunc func(*options)
optionFunc func(*options)
TaskCompletedRequest struct {
WorkflowTaskCompletedRequest *workflowservice.RespondWorkflowTaskCompletedRequest
QueryTaskCompletedRequest *workflowservice.RespondQueryTaskCompletedRequest
}
TaskCompletedResponse struct {
WorkflowTaskCompletedResponse *workflowservice.RespondWorkflowTaskCompletedResponse
QueryTaskCompletedResponse *workflowservice.RespondQueryTaskCompletedResponse
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I'm not sure this is the right direction. I understand that the current API doesn't work for these Queries, but 95% of the time, the task poller will probably only deal with WFTs. Right now, without thinking longer about it, I see three other directions:

  1. Create a dedicated, exported function just for queries.
  2. Change the return type to any and require type assertions.
  3. Hide the fact that there's a "legacy" query (AFAIU) and use the "new" query API; but inside the taskpoller do the right thing auto-magically

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) would be the easiest and clearest; let me know if that's a feasible option. I haven't worked with this Query API before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tagging you @ShahabT here as well since you had the exact concern - The main point seems to be to remove the need for TaskCompletedResponse/Request structures.

In my earlier stab at this, I did not have any of these structures and had made one exported function inside (*workflowTaskPoller).handleTask and had changed the return type to any (all the places that required it). Not, that in this attempt I also intended on using Versioning3Suite.pollWftAndHandle since it served as a nice helper function for all my needs. My train of thought was:

  • That "having type assertions and changing the return type to nil" or having these two structures were effectively conveying the same purpose: The response can be one of two types - Query or Workflow
  • Having understood this, if I were reading the codebase for the first time, I realized I would appreciate knowing that the handler deals with only Queries and Workflow related tasks (which is made clear by the structure in place since it has only those two fields vs any would make it a bit confusing)

Hence, I took the more tedious approach and went ahead with this option - I can certainly revert back if you both feel strongly of not keeping this the way it is but would be curious to hear your thoughts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your reasoning is totally reasonable; but I strongly believe it's not the right path because it's too verbose for the majority of use cases. That was already a concern before, and is exacerbated here.

Why not keep all these methods untouched and add a HandleQueryTask instead:

To pick up Shahab's suggestion; is this feasible in your opinion? If so, it would be my preferred approach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be attempting to have type assertions in place along with a separate HandleQueryTask method ⏳

)

var (
// DrainWorkflowTask returns an empty RespondWorkflowTaskCompletedRequest
DrainWorkflowTask = func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
return &workflowservice.RespondWorkflowTaskCompletedRequest{}, nil
DrainWorkflowTask = func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error) {
return &TaskCompletedRequest{WorkflowTaskCompletedRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{}}, nil
}
// CompleteActivityTask returns a RespondActivityTaskCompletedRequest with an auto-generated `Result` from `tv.Any().Payloads()`.
CompleteActivityTask = func(tv *testvars.TestVars) func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) {
Expand Down Expand Up @@ -104,16 +119,22 @@ func (p *TaskPoller) PollWorkflowTask(
return &workflowTaskPoller{TaskPoller: p, pollWorkflowTaskRequest: req}
}

func (p *TaskPoller) PollNexusTask(
req *workflowservice.PollNexusTaskQueueRequest,
) *nexusTaskPoller {
return &nexusTaskPoller{TaskPoller: p, pollNexusTaskRequest: req}
}

// PollAndHandleWorkflowTask issues a PollWorkflowTaskQueueRequest to obtain a new workflow task,
// invokes the handler with the task, and completes/fails the task accordingly.
// Any unspecified but required request and response fields are automatically generated using `tv`.
// Returning an error from `handler` fails the task.
// If no task is available, it returns NoTaskAvailable.
func (p *TaskPoller) PollAndHandleWorkflowTask(
tv *testvars.TestVars,
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error),
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error),
opts ...optionFunc,
) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) {
) (*TaskCompletedResponse, error) {
return p.
PollWorkflowTask(&workflowservice.PollWorkflowTaskQueueRequest{}).
HandleTask(tv, handler, opts...)
Expand All @@ -125,9 +146,123 @@ func (p *TaskPoller) PollAndHandleWorkflowTask(
// If no task is available, it returns NoTaskAvailable.
func (p *workflowTaskPoller) HandleTask(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time we do not work with queries, it seems a little to much to impose the nested TaskCompletedResponse on all usages. Why not keep all these methods untouched and add a HandleQueryTask instead:

func (p *workflowTaskPoller) HandleQueryTask(tv *testvars.TestVars,
	handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedResponse, error)

tv *testvars.TestVars,
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error),
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error),
opts ...optionFunc,
) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) {
) (*TaskCompletedResponse, error) {
p.t.Helper()
options := newOptions(tv, opts)
ctx, cancel := newContext(options)
defer cancel()
return p.pollAndHandleTask(ctx, options, handler)
}

func (p *nexusTaskPoller) pollTask(
ctx context.Context,
opts *options,
) (*workflowservice.PollNexusTaskQueueResponse, error) {
p.t.Helper()

req := common.CloneProto(p.pollNexusTaskRequest)
if req.Namespace == "" {
req.Namespace = p.namespace
}
if req.TaskQueue == nil {
req.TaskQueue = opts.tv.TaskQueue()
}
if req.Identity == "" {
req.Identity = opts.tv.WorkerIdentity()
}
resp, err := p.client.PollNexusTaskQueue(ctx, req)
if err != nil {
return nil, err
}
if resp == nil || resp.TaskToken == nil {
return nil, NoWorkflowTaskAvailable
}

return resp, err
}

func (p *nexusTaskPoller) pollAndHandleTask(
ctx context.Context,
opts *options,
handler func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error),
) (*workflowservice.RespondNexusTaskCompletedResponse, error) {
p.t.Helper()
task, err := p.pollTask(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to poll nexus task: %w", err)
}
return p.handleTask(ctx, opts, task, handler)
}
func (p *nexusTaskPoller) handleTask(
ctx context.Context,
opts *options,
task *workflowservice.PollNexusTaskQueueResponse,
handler func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error),
) (*workflowservice.RespondNexusTaskCompletedResponse, error) {
p.t.Helper()
reply, err := handler(task)
if err != nil {
return nil, p.respondNexusTaskFailed(ctx, opts, task.TaskToken)
}

resp, err := p.respondNexusTaskCompleted(ctx, opts, task, reply)
if err != nil {
return nil, err
}

return resp, nil
}

func (p *nexusTaskPoller) respondNexusTaskCompleted(
ctx context.Context,
opts *options,
task *workflowservice.PollNexusTaskQueueResponse,
reply *workflowservice.RespondNexusTaskCompletedRequest,
) (*workflowservice.RespondNexusTaskCompletedResponse, error) {
p.t.Helper()
if reply == nil {
return nil, errors.New("missing RespondWorkflowTaskCompletedRequest return")
}
if reply.Namespace == "" {
reply.Namespace = p.namespace
}
if len(reply.TaskToken) == 0 {
reply.TaskToken = task.TaskToken
}
if reply.Identity == "" {
reply.Identity = opts.tv.WorkerIdentity()
}
reply.Response = &nexuspb.Response{}

return p.client.RespondNexusTaskCompleted(ctx, reply)
}

func (p *nexusTaskPoller) respondNexusTaskFailed(
ctx context.Context,
opts *options,
taskToken []byte,
) error {
p.t.Helper()
_, err := p.client.RespondNexusTaskFailed(
ctx,
&workflowservice.RespondNexusTaskFailedRequest{
Namespace: p.namespace,
TaskToken: taskToken,
Identity: opts.tv.WorkerIdentity(),
Error: &nexuspb.HandlerError{
ErrorType: string(nexus.HandlerErrorTypeInternal),
},
})
return err
}

func (p *nexusTaskPoller) HandleTask(
tv *testvars.TestVars,
handler func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error),
opts ...optionFunc,
) (*workflowservice.RespondNexusTaskCompletedResponse, error) {
Shivs11 marked this conversation as resolved.
Show resolved Hide resolved
p.t.Helper()
options := newOptions(tv, opts)
ctx, cancel := newContext(options)
Expand All @@ -141,9 +276,9 @@ func (p *workflowTaskPoller) HandleTask(
func (p *TaskPoller) HandleWorkflowTask(
tv *testvars.TestVars,
task *workflowservice.PollWorkflowTaskQueueResponse,
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error),
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error),
opts ...optionFunc,
) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) {
) (*TaskCompletedResponse, error) {
p.t.Helper()
options := newOptions(tv, opts)
ctx, cancel := newContext(options)
Expand Down Expand Up @@ -239,9 +374,6 @@ func (p *workflowTaskPoller) pollTask(
}

events = history.Events
if len(events) == 0 {
Shivs11 marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("history events are empty")
}

nextPageToken := resp.NextPageToken
for nextPageToken != nil {
Expand All @@ -265,8 +397,8 @@ func (p *workflowTaskPoller) pollTask(
func (p *workflowTaskPoller) pollAndHandleTask(
ctx context.Context,
opts *options,
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error),
) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) {
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error),
) (*TaskCompletedResponse, error) {
p.t.Helper()
task, err := p.pollTask(ctx, opts)
if err != nil {
Expand All @@ -279,19 +411,52 @@ func (p *workflowTaskPoller) handleTask(
ctx context.Context,
opts *options,
task *workflowservice.PollWorkflowTaskQueueResponse,
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error),
) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) {
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*TaskCompletedRequest, error),
) (*TaskCompletedResponse, error) {
p.t.Helper()
reply, err := handler(task)
if err != nil {
return nil, p.respondTaskFailed(ctx, opts, task.TaskToken, err)
}

resp, err := p.respondTaskCompleted(ctx, opts, task, reply)
if reply.QueryTaskCompletedRequest != nil {
resp, err := p.respondQueryTaskCompleted(ctx, task, reply.QueryTaskCompletedRequest)
if err != nil {
return nil, err
}
return &TaskCompletedResponse{QueryTaskCompletedResponse: resp}, nil
}

resp, err := p.respondTaskCompleted(ctx, opts, task, reply.WorkflowTaskCompletedRequest)
if err != nil {
return nil, err
}

return &TaskCompletedResponse{WorkflowTaskCompletedResponse: resp}, nil
}

func (p *workflowTaskPoller) respondQueryTaskCompleted(
ctx context.Context,
task *workflowservice.PollWorkflowTaskQueueResponse,
reply *workflowservice.RespondQueryTaskCompletedRequest,
) (*workflowservice.RespondQueryTaskCompletedResponse, error) {
p.t.Helper()
if task == nil {
return nil, errors.New("missing PollWorkflowTaskQueue")
Shivs11 marked this conversation as resolved.
Show resolved Hide resolved
}
if reply == nil {
stephanos marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("missing RespondQueryTaskCompleted return")
Shivs11 marked this conversation as resolved.
Show resolved Hide resolved
}
if reply.Namespace == "" {
reply.Namespace = p.namespace
}
if reply.TaskToken == nil {
reply.TaskToken = task.TaskToken
}
Shivs11 marked this conversation as resolved.
Show resolved Hide resolved
resp, err := p.client.RespondQueryTaskCompleted(ctx, reply)
if err != nil {
return nil, fmt.Errorf("failed to respond with respondQueryTaskCompleted: %w", err)
}
return resp, nil
}

Expand Down
1 change: 1 addition & 0 deletions service/history/api/get_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,5 +361,6 @@ func MutableStateToGetResponse(
InheritedBuildId: mutableState.GetInheritedBuildId(),
MostRecentWorkerVersionStamp: mostRecentWorkerVersionStamp,
TransitionHistory: mutableState.GetExecutionInfo().TransitionHistory,
VersioningInfo: mutableState.GetExecutionInfo().VersioningInfo,
Shivs11 marked this conversation as resolved.
Show resolved Hide resolved
}, nil
}
1 change: 1 addition & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2563,6 +2563,7 @@ func (ms *MutableStateImpl) GetTransientWorkflowTaskInfo(
workflowTask *WorkflowTaskInfo,
identity string,
) *historyspb.TransientWorkflowTaskInfo {
// (todo-shivam): can we remove this check ms.IsTransientWorkflowTask()?
if !ms.IsTransientWorkflowTask() && workflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
return nil
}
Expand Down
Loading
Loading