From 81f2c8d2e295c306e48b0d7099495c03f294ddbf Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Thu, 30 May 2024 17:37:09 -0700 Subject: [PATCH] Add local activity test and truncate unnecessary error response (#395) --- .../v4-local-activity-optimization.json | 502 ++++++++++++++++++ integ/replay_test.go | 1 + integ/wf_state_api_fail_test.go | 9 +- service/interpreter/activityImpl.go | 32 +- .../interpreter/cadence/activityProvider.go | 5 +- service/interpreter/interfaces.go | 5 +- .../interpreter/temporal/activityProvider.go | 5 +- 7 files changed, 546 insertions(+), 13 deletions(-) create mode 100644 integ/history/v4-local-activity-optimization.json diff --git a/integ/history/v4-local-activity-optimization.json b/integ/history/v4-local-activity-optimization.json new file mode 100644 index 00000000..9c245ba6 --- /dev/null +++ b/integ/history/v4-local-activity-optimization.json @@ -0,0 +1,502 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-05-31T00:07:56.544182982Z", + "eventType": "WorkflowExecutionStarted", + "version": "0", + "taskId": "1056141", + "workerMayIgnore": false, + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "Interpreter" + }, + "parentWorkflowNamespace": "", + "parentWorkflowNamespaceId": "", + "parentWorkflowExecution": null, + "parentInitiatedEventId": "0", + "taskQueue": { + "name": "Interpreter_DEFAULT", + "kind": "Normal", + "normalName": "" + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "eyJpd2ZXb3JrZmxvd1R5cGUiOiJ3Zl9zdGF0ZV9hcGlfZmFpbCIsIml3ZldvcmtlclVybCI6Imh0dHA6Ly9sb2NhbGhvc3Q6OTcxNCIsInN0YXJ0U3RhdGVJZCI6IlMxIiwic3RhdGVPcHRpb25zIjp7InN0YXJ0QXBpUmV0cnlQb2xpY3kiOnsibWF4aW11bUF0dGVtcHRzRHVyYXRpb25TZWNvbmRzIjoxfX0sImNvbmZpZyI6eyJjb250aW51ZUFzTmV3VGhyZXNob2xkIjoxLCJvcHRpbWl6ZUFjdGl2aXR5Ijp0cnVlfX0=" + } + ] + }, + "workflowExecutionTimeout": "10s", + "workflowRunTimeout": "10s", + "workflowTaskTimeout": "10s", + "continuedExecutionRunId": "", + "initiator": "Unspecified", + "continuedFailure": null, + "lastCompletionResult": null, + "originalExecutionRunId": "37484f7c-7c7d-485e-b19f-24bf327f1ba7", + "identity": "26382@IT-USA-VX8131@", + "firstExecutionRunId": "37484f7c-7c7d-485e-b19f-24bf327f1ba7", + "retryPolicy": null, + "attempt": 1, + "workflowExecutionExpirationTime": "2024-05-31T00:08:06.544Z", + "cronSchedule": "", + "firstWorkflowTaskBackoff": "0s", + "memo": { + "fields": { + "__IwfSystem_WorkerUrl": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "eyJkYXRhIjoiaHR0cDovL2xvY2FsaG9zdDo5NzE0In0=" + } + } + }, + "searchAttributes": { + "indexedFields": { + "IwfWorkflowType": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==", + "type": "S2V5d29yZA==" + }, + "data": "IndmX3N0YXRlX2FwaV9mYWlsIg==" + } + } + }, + "prevAutoResetPoints": null, + "header": { + "fields": {} + }, + "parentInitiatedEventVersion": "0", + "workflowId": "wf_state_api_fail1717114076541268000", + "sourceVersionStamp": null + } + }, + { + "eventId": "2", + "eventTime": "2024-05-31T00:07:56.544210940Z", + "eventType": "WorkflowTaskScheduled", + "version": "0", + "taskId": "1056142", + "workerMayIgnore": false, + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Interpreter_DEFAULT", + "kind": "Normal", + "normalName": "" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-05-31T00:07:56.550851722Z", + "eventType": "WorkflowTaskStarted", + "version": "0", + "taskId": "1056148", + "workerMayIgnore": false, + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "26382@IT-USA-VX8131@", + "requestId": "63fcc036-0954-497c-a030-0ae00b57fd49", + "suggestContinueAsNew": false, + "historySizeBytes": "725" + } + }, + { + "eventId": "4", + "eventTime": "2024-05-31T00:07:59.568419122Z", + "eventType": "WorkflowTaskCompleted", + "version": "0", + "taskId": "1056152", + "workerMayIgnore": false, + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "26382@IT-USA-VX8131@", + "binaryChecksum": "", + "workerVersion": { + "buildId": "77476a2c1231cfb3284aa6034797268e", + "bundleId": "", + "useVersioning": false + }, + "sdkMetadata": { + "coreUsedFlags": [], + "langUsedFlags": [ + 3, + 1 + ], + "sdkName": "temporal-go", + "sdkVersion": "1.25.1" + }, + "meteringMetadata": { + "nonfirstLocalActivityExecutionAttempts": 0 + } + } + }, + { + "eventId": "5", + "eventTime": "2024-05-31T00:07:59.568466247Z", + "eventType": "MarkerRecorded", + "version": "0", + "taskId": "1056153", + "workerMayIgnore": false, + "markerRecordedEventAttributes": { + "markerName": "Version", + "details": { + "change-id": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Imdsb2JhbCI=" + } + ] + }, + "version": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "NA==" + } + ] + } + }, + "workflowTaskCompletedEventId": "4", + "header": null, + "failure": null + } + }, + { + "eventId": "6", + "eventTime": "2024-05-31T00:07:59.569526252Z", + "eventType": "UpsertWorkflowSearchAttributes", + "version": "0", + "taskId": "1056154", + "workerMayIgnore": false, + "upsertWorkflowSearchAttributesEventAttributes": { + "workflowTaskCompletedEventId": "4", + "searchAttributes": { + "indexedFields": { + "TemporalChangeVersion": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==", + "type": "S2V5d29yZExpc3Q=" + }, + "data": "WyJnbG9iYWwtNCJd" + } + } + } + } + }, + { + "eventId": "7", + "eventTime": "2024-05-31T00:07:59.570156964Z", + "eventType": "UpsertWorkflowSearchAttributes", + "version": "0", + "taskId": "1056155", + "workerMayIgnore": false, + "upsertWorkflowSearchAttributesEventAttributes": { + "workflowTaskCompletedEventId": "4", + "searchAttributes": { + "indexedFields": { + "IwfGlobalWorkflowVersion": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==", + "type": "SW50" + }, + "data": "NA==" + } + } + } + } + }, + { + "eventId": "8", + "eventTime": "2024-05-31T00:07:59.570514215Z", + "eventType": "UpsertWorkflowSearchAttributes", + "version": "0", + "taskId": "1056156", + "workerMayIgnore": false, + "upsertWorkflowSearchAttributesEventAttributes": { + "workflowTaskCompletedEventId": "4", + "searchAttributes": { + "indexedFields": { + "IwfExecutingStateIds": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==", + "type": "S2V5d29yZExpc3Q=" + }, + "data": "WyJTMSJd" + } + } + } + } + }, + { + "eventId": "9", + "eventTime": "2024-05-31T00:07:59.570529340Z", + "eventType": "MarkerRecorded", + "version": "0", + "taskId": "1056157", + "workerMayIgnore": false, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "eyJBY3Rpdml0eUlEIjoiMSIsIkFjdGl2aXR5VHlwZSI6IlN0YXRlQXBpV2FpdFVudGlsIiwiUmVwbGF5VGltZSI6IjIwMjQtMDUtMzFUMDA6MDc6NTkuNTYwMDI2NDcyWiIsIkF0dGVtcHQiOjMsIkJhY2tvZmYiOi0xfQ==" + } + ] + } + }, + "workflowTaskCompletedEventId": "4", + "header": null, + "failure": { + "message": "", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "applicationFailureInfo": { + "type": "1st-attempt-failure", + "nonRetryable": false, + "details": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InN0YXR1c0NvZGU6IDQwMCwgcmVzcG9uc2VCb2R5OiB7fS4uLiwgZXJyTXNnOiA0MDAgQi4uLiI=" + } + ] + } + } + } + } + }, + { + "eventId": "10", + "eventTime": "2024-05-31T00:07:59.570625174Z", + "eventType": "ActivityTaskScheduled", + "version": "0", + "taskId": "1056158", + "workerMayIgnore": false, + "activityTaskScheduledEventAttributes": { + "activityId": "10", + "activityType": { + "name": "StateApiWaitUntil" + }, + "taskQueue": { + "name": "Interpreter_DEFAULT", + "kind": "Normal", + "normalName": "" + }, + "header": { + "fields": {} + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InRlbXBvcmFsIg==" + }, + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "eyJJd2ZXb3JrZXJVcmwiOiJodHRwOi8vbG9jYWxob3N0Ojk3MTQiLCJSZXF1ZXN0Ijp7ImNvbnRleHQiOnsic3RhdGVFeGVjdXRpb25JZCI6IlMxLTEiLCJ3b3JrZmxvd0lkIjoid2Zfc3RhdGVfYXBpX2ZhaWwxNzE3MTE0MDc2NTQxMjY4MDAwIiwid29ya2Zsb3dSdW5JZCI6IjM3NDg0ZjdjLTdjN2QtNDg1ZS1iMTlmLTI0YmYzMjdmMWJhNyIsIndvcmtmbG93U3RhcnRlZFRpbWVzdGFtcCI6MTcxNzExNDA3Nn0sIndvcmtmbG93U3RhdGVJZCI6IlMxIiwid29ya2Zsb3dUeXBlIjoid2Zfc3RhdGVfYXBpX2ZhaWwifX0=" + } + ] + }, + "scheduleToCloseTimeout": "1s", + "scheduleToStartTimeout": "1s", + "startToCloseTimeout": "1s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "4", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s", + "maximumAttempts": 0, + "nonRetryableErrorTypes": [] + }, + "useCompatibleVersion": true + } + }, + { + "eventId": "11", + "eventTime": "2024-05-31T00:07:59.582831317Z", + "eventType": "ActivityTaskStarted", + "version": "0", + "taskId": "1056165", + "workerMayIgnore": false, + "activityTaskStartedEventAttributes": { + "scheduledEventId": "10", + "identity": "26382@IT-USA-VX8131@", + "requestId": "b554f605-758d-49d5-8d32-ec88afe021c1", + "attempt": 1, + "lastFailure": null + } + }, + { + "eventId": "12", + "eventTime": "2024-05-31T00:07:59.591765110Z", + "eventType": "ActivityTaskFailed", + "version": "0", + "taskId": "1056166", + "workerMayIgnore": false, + "activityTaskFailedEventAttributes": { + "failure": { + "message": "", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "applicationFailureInfo": { + "type": "STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE", + "nonRetryable": false, + "details": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InN0YXR1c0NvZGU6IDQwMCwgcmVzcG9uc2VCb2R5OiB7fSwgZXJyTXNnOiA0MDAgQmFkIFJlcXVlc3QgICglIXMoKnN0cmluZz1cdTAwM2NuaWxcdTAwM2UpKSI=" + } + ] + } + } + }, + "scheduledEventId": "10", + "startedEventId": "11", + "identity": "26382@IT-USA-VX8131@", + "retryState": "Timeout", + "workerVersion": null + } + }, + { + "eventId": "13", + "eventTime": "2024-05-31T00:07:59.591771527Z", + "eventType": "WorkflowTaskScheduled", + "version": "0", + "taskId": "1056167", + "workerMayIgnore": false, + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "IT-USA-VX8131:886822b2-4d80-4038-9f61-b7f867961e99", + "kind": "Sticky", + "normalName": "Interpreter_DEFAULT" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "14", + "eventTime": "2024-05-31T00:07:59.596797635Z", + "eventType": "WorkflowTaskStarted", + "version": "0", + "taskId": "1056171", + "workerMayIgnore": false, + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "13", + "identity": "26382@IT-USA-VX8131@", + "requestId": "ede1f551-cf3f-4c75-8790-37641e546918", + "suggestContinueAsNew": false, + "historySizeBytes": "2581" + } + }, + { + "eventId": "15", + "eventTime": "2024-05-31T00:07:59.601441074Z", + "eventType": "WorkflowTaskCompleted", + "version": "0", + "taskId": "1056175", + "workerMayIgnore": false, + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "13", + "startedEventId": "14", + "identity": "26382@IT-USA-VX8131@", + "binaryChecksum": "", + "workerVersion": { + "buildId": "77476a2c1231cfb3284aa6034797268e", + "bundleId": "", + "useVersioning": false + }, + "sdkMetadata": { + "coreUsedFlags": [], + "langUsedFlags": [], + "sdkName": "", + "sdkVersion": "" + }, + "meteringMetadata": { + "nonfirstLocalActivityExecutionAttempts": 0 + } + } + }, + { + "eventId": "16", + "eventTime": "2024-05-31T00:07:59.601461991Z", + "eventType": "WorkflowExecutionFailed", + "version": "0", + "taskId": "1056176", + "workerMayIgnore": false, + "workflowExecutionFailedEventAttributes": { + "failure": { + "message": "activity error", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": { + "message": "", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "applicationFailureInfo": { + "type": "STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE", + "nonRetryable": false, + "details": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InN0YXR1c0NvZGU6IDQwMCwgcmVzcG9uc2VCb2R5OiB7fSwgZXJyTXNnOiA0MDAgQmFkIFJlcXVlc3QgICglIXMoKnN0cmluZz1cdTAwM2NuaWxcdTAwM2UpKSI=" + } + ] + } + } + }, + "activityFailureInfo": { + "scheduledEventId": "10", + "startedEventId": "11", + "identity": "26382@IT-USA-VX8131@", + "activityType": { + "name": "StateApiWaitUntil" + }, + "activityId": "10", + "retryState": "Timeout" + } + }, + "retryState": "RetryPolicyNotSet", + "workflowTaskCompletedEventId": "15", + "newExecutionRunId": "" + } + } + ] +} \ No newline at end of file diff --git a/integ/replay_test.go b/integ/replay_test.go index 13d8b8b9..f8d12a63 100644 --- a/integ/replay_test.go +++ b/integ/replay_test.go @@ -20,6 +20,7 @@ var jsonHistoryFiles = []string{ "v3-bug-no-state-stuck.json", "v4-continue-as-new-on-no-state.json", "v4-continued-as-new-before-versioning-optimization.json", + "v4-local-activity-optimization.json", } func TestTemporalReplay(t *testing.T) { diff --git a/integ/wf_state_api_fail_test.go b/integ/wf_state_api_fail_test.go index 84e04071..26e3cd14 100644 --- a/integ/wf_state_api_fail_test.go +++ b/integ/wf_state_api_fail_test.go @@ -20,7 +20,7 @@ func TestStateApiFailTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestStateApiFail(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestStateApiFail(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) + doTestStateApiFail(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) smallWaitForFastTest() } } @@ -82,8 +82,13 @@ func doTestStateApiFail(t *testing.T, backendType service.BackendType, config *i history, _ := wfHandler.GetTestResult() assertions := assert.New(t) + expectedStart := int64(1) + if config != nil && *config.OptimizeActivity { + // NOTE:3 attempts are from LocalActivity, 1 attempt is from State API Options + expectedStart = 4 + } assertions.Equalf(map[string]int64{ - "S1_start": 1, + "S1_start": expectedStart, }, history, "wf state api fail test fail, %v", history) // TODO: fix (%!s(*string=)) in the error message diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index 43abf5cb..3a5209ba 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -47,7 +47,9 @@ func StateApiWaitUntil( resp, httpResp, err := req.WorkflowStateStartRequest(input.Request).Execute() printDebugMsg(logger, err, iwfWorkerBaseUrl) if checkHttpError(err, httpResp) { - return nil, composeHttpError(provider, err, httpResp, string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE)) + return nil, composeHttpError( + provider.GetActivityInfo(ctx).IsLocalActivity, + provider, err, httpResp, string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE)) } if err := checkCommandRequestFromWaitUntilResponse(resp); err != nil { @@ -93,7 +95,9 @@ func StateApiExecute( resp, httpResp, err := req.WorkflowStateDecideRequest(input.Request).Execute() printDebugMsg(logger, err, iwfWorkerBaseUrl) if checkHttpError(err, httpResp) { - return nil, composeHttpError(provider, err, httpResp, string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE)) + return nil, composeHttpError( + provider.GetActivityInfo(ctx).IsLocalActivity, + provider, err, httpResp, string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE)) } if err = checkStateDecisionFromResponse(resp); err != nil { @@ -136,7 +140,9 @@ func checkHttpError(err error, httpResp *http.Response) bool { return false } -func composeHttpError(provider ActivityProvider, err error, httpResp *http.Response, errType string) error { +func composeHttpError( + isLocalActivity bool, provider ActivityProvider, err error, httpResp *http.Response, errType string, +) error { responseBody := "None" var statusCode int if httpResp != nil { @@ -148,8 +154,23 @@ func composeHttpError(provider ActivityProvider, err error, httpResp *http.Respo } statusCode = httpResp.StatusCode } + errMsg := err.Error() + if isLocalActivity { + maxL := len(errMsg) + if maxL > 5 { + maxL = 5 + } + errMsg = errMsg[:maxL] + "..." + + maxL = len(responseBody) + if maxL > 50 { + maxL = 50 + } + responseBody = responseBody[:maxL] + "..." + errType = "1st-attempt-failure" + } return provider.NewApplicationError(errType, - fmt.Sprintf("statusCode: %v, responseBody: %v, errMsg: %v", statusCode, responseBody, err)) + fmt.Sprintf("statusCode: %v, responseBody: %v, errMsg: %v", statusCode, responseBody, errMsg)) } func checkCommandRequestFromWaitUntilResponse(resp *iwfidl.WorkflowStateStartResponse) error { @@ -209,7 +230,8 @@ func DumpWorkflowInternal( request := apiClient.DefaultApi.ApiV1WorkflowInternalDumpPost(ctx) resp, httpResp, err := request.WorkflowDumpRequest(req).Execute() if checkHttpError(err, httpResp) { - return nil, composeHttpError(provider, err, httpResp, string(iwfidl.SERVER_INTERNAL_ERROR_TYPE)) + return nil, composeHttpError(provider.GetActivityInfo(ctx).IsLocalActivity, + provider, err, httpResp, string(iwfidl.SERVER_INTERNAL_ERROR_TYPE)) } return resp, nil } diff --git a/service/interpreter/cadence/activityProvider.go b/service/interpreter/cadence/activityProvider.go index 3a32b820..0e16ee85 100644 --- a/service/interpreter/cadence/activityProvider.go +++ b/service/interpreter/cadence/activityProvider.go @@ -28,7 +28,8 @@ func (a *activityProvider) GetLogger(ctx context.Context) interpreter.UnifiedLog func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.ActivityInfo { info := activity.GetInfo(ctx) return interpreter.ActivityInfo{ - ScheduledTime: info.ScheduledTimestamp, - Attempt: info.Attempt + 1, // NOTE increase by one to match Temporal + ScheduledTime: info.ScheduledTimestamp, + Attempt: info.Attempt + 1, // NOTE increase by one to match Temporal + IsLocalActivity: false, // TODO cadence doesn't support this yet } } diff --git a/service/interpreter/interfaces.go b/service/interpreter/interfaces.go index 4da3144e..fd4c7731 100644 --- a/service/interpreter/interfaces.go +++ b/service/interpreter/interfaces.go @@ -16,8 +16,9 @@ type ActivityProvider interface { } type ActivityInfo struct { - ScheduledTime time.Time // Time of activity scheduled by a workflow - Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified. + ScheduledTime time.Time // Time of activity scheduled by a workflow + Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified. + IsLocalActivity bool // Whether the activity is at local activity } var activityProviderRegistry = make(map[service.BackendType]ActivityProvider) diff --git a/service/interpreter/temporal/activityProvider.go b/service/interpreter/temporal/activityProvider.go index 7d18a5e0..49152c41 100644 --- a/service/interpreter/temporal/activityProvider.go +++ b/service/interpreter/temporal/activityProvider.go @@ -25,7 +25,8 @@ func (a *activityProvider) NewApplicationError(errType string, details interface func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.ActivityInfo { info := activity.GetInfo(ctx) return interpreter.ActivityInfo{ - ScheduledTime: info.ScheduledTime, - Attempt: info.Attempt, + ScheduledTime: info.ScheduledTime, + Attempt: info.Attempt, + IsLocalActivity: info.IsLocalActivity, } }