Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry mechanic to worker-specific-task-queue sample #383

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions worker-specific-task-queues/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@ Start the Workflow Execution:
```bash
go run worker-specific-task-queues/starter/main.go
```

### Things to try
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can add a unit test to show the retry working? Not blocking but would be nice to show in a unit test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, added!

You can try to intentionally crash Workers while they are doing work to see what happens when work gets "stuck" in a unique queue: currently the Workflow will `scheduleToCloseTimeout` without a Worker, and retry when a Worker comes back online.

After the 5th attempt, it logs `Workflow failed after multiple retries.` and exits. But you may wish to implement compensatory logic, including notifying you.
18 changes: 18 additions & 0 deletions worker-specific-task-queues/workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package worker_specific_task_queues

import (
"go.temporal.io/sdk/temporal"
"path/filepath"
"time"

Expand All @@ -11,8 +12,25 @@ import (
// FileProcessingWorkflow is a workflow that uses Worker-specific Task Queues to run multiple Activities on a consistent
// host.
func FileProcessingWorkflow(ctx workflow.Context) (err error) {
// When using a worker-specific task queue, if a failure occurs, we want to retry all of the worker-specific
// logic, so wrap all the logic here in a loop.
for attempt := range 5 {
if err = processFile(ctx); err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably one might only retry on schedule to close timeout, but this is probably fine too.

workflow.GetLogger(ctx).Info("Workflow completed.")
return
}
workflow.GetLogger(ctx).Error("Attempt failed, trying on new worker", attempt+1)
}
workflow.GetLogger(ctx).Error("Workflow failed after multiple retries.", "Error", err.Error())
return
}

func processFile(ctx workflow.Context) (err error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 1,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
var WorkerSpecificTaskQueue string
Expand Down
53 changes: 53 additions & 0 deletions worker-specific-task-queues/workflow_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package worker_specific_task_queues

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -29,3 +31,54 @@ func Test_Workflow(t *testing.T) {
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
}

func Test_RetrySuccess(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

// Mock activity implementation
var a WorkerSpecificTaskQueue
env.RegisterActivityWithOptions(a.GetWorkerSpecificTaskQueue, activity.RegisterOptions{
Name: "GetWorkerSpecificTaskQueue",
})

counter := 0
env.OnActivity("GetWorkerSpecificTaskQueue", mock.Anything).Return(func(ctx context.Context) (string, error) {
counter++
// Workflow retries up to 5 times
if counter < 3 {
return "", errors.New("temporary error")
}
return "unique-task-queue", nil
})
env.OnActivity(DownloadFile, mock.Anything, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(ProcessFile, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(DeleteFile, mock.Anything, mock.Anything).Return(nil)

env.ExecuteWorkflow(FileProcessingWorkflow)

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
}

func Test_RetryFail(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

// Mock activity implementation
var a WorkerSpecificTaskQueue
env.RegisterActivityWithOptions(a.GetWorkerSpecificTaskQueue, activity.RegisterOptions{
Name: "GetWorkerSpecificTaskQueue",
})
env.OnActivity("GetWorkerSpecificTaskQueue", mock.Anything).Return(func(ctx context.Context) (string, error) {
return "", errors.New("error to show a retry mechanic failure")
})
env.OnActivity(DownloadFile, mock.Anything, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(ProcessFile, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(DeleteFile, mock.Anything, mock.Anything).Return(nil)

env.ExecuteWorkflow(FileProcessingWorkflow)

require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError())
}