From 4f81bad4108860ab9eec141791a4056e4126c71b Mon Sep 17 00:00:00 2001 From: Bryan Moffatt Date: Wed, 3 Jan 2024 17:32:38 -0800 Subject: [PATCH 1/6] WIP: Plumb error info to x-ray --- lambda/invoke_loop.go | 34 ++++++++++++++++++++++++++-- lambda/invoke_loop_test.go | 43 ++++++++++++++++++++++++++++++++++++ lambda/runtime_api_client.go | 24 +++++++++++++++++--- 3 files changed, 96 insertions(+), 5 deletions(-) diff --git a/lambda/invoke_loop.go b/lambda/invoke_loop.go index 9e2d6598..0d4397a6 100644 --- a/lambda/invoke_loop.go +++ b/lambda/invoke_loop.go @@ -102,9 +102,16 @@ func handleInvoke(invoke *invoke, handler *handlerOptions) error { } func reportFailure(invoke *invoke, invokeErr *messages.InvokeResponse_Error) error { - errorPayload := safeMarshal(invokeErr) + errorForXRay := makeErrorForXRay(invokeErr) + errorPayload := errorForXRay.Exceptions[0] log.Printf("%s", errorPayload) - if err := invoke.failure(bytes.NewReader(errorPayload), contentTypeJSON); err != nil { + + causeForXRay, err := json.Marshal(errorForXRay) + if err != nil { + return fmt.Errorf("unexpected error occured when serializing the function error cause for X-Ray: %v", err) + } + + if err := invoke.xfailure(bytes.NewReader(errorPayload), contentTypeJSON, causeForXRay); err != nil { return fmt.Errorf("unexpected error occurred when sending the function error to the API: %v", err) } return nil @@ -166,3 +173,26 @@ func safeMarshal(v interface{}) []byte { } return payload } + +type errorForXRay struct { + WorkingDirectory string `json:"working_directory"` + Exceptions []json.RawMessage `json:"exceptions"` // returned as bytes to avoid double-serializing + Paths []string `json:"paths"` +} + +func makeErrorForXRay(invokeResponseError *messages.InvokeResponse_Error) *errorForXRay { + pathSet := make(map[string]struct{}, len(invokeResponseError.StackTrace)) + for _, frame := range invokeResponseError.StackTrace { + pathSet[frame.Path] = struct{}{} + } + paths := make([]string, 0, len(pathSet)) + for path := range pathSet { + paths = append(paths, path) + } + cwd, _ := os.Getwd() + return &errorForXRay{ + WorkingDirectory: cwd, + Paths: paths, + Exceptions: []json.RawMessage{safeMarshal(invokeResponseError)}, + } +} diff --git a/lambda/invoke_loop_test.go b/lambda/invoke_loop_test.go index fab800b9..ab110c50 100644 --- a/lambda/invoke_loop_test.go +++ b/lambda/invoke_loop_test.go @@ -90,6 +90,47 @@ func TestCustomErrorMarshaling(t *testing.T) { } } +func TestXRayCausePlumbing(t *testing.T) { + errors := []error{ + messages.InvokeResponse_Error{ + Type: "yolo", + Message: "hello", + StackTrace: []*messages.InvokeResponse_Error_StackFrame{ + {Label: "yolo", Path: "yolo", Line: 2}, + }, + }, + } + wd, _ := os.Getwd() + expected := []string{ + `{ + "working_directory":"` + wd + `", + "paths": ["yolo"], + "exceptions": [{ + "errorType": "yolo", + "errorMessage": "hello", + "stackTrace": [ + {"label": "yolo", "path": "yolo", "line": 2} + ] + }] + }`, + } + require.Equal(t, len(errors), len(expected)) + ts, record := runtimeAPIServer(``, len(errors)) + defer ts.Close() + n := 0 + handler := NewHandler(func() error { + defer func() { n++ }() + return errors[n] + }) + endpoint := strings.Split(ts.URL, "://")[1] + expectedError := fmt.Sprintf("failed to GET http://%s/2018-06-01/runtime/invocation/next: got unexpected status code: 410", endpoint) + assert.EqualError(t, startRuntimeAPILoop(endpoint, handler), expectedError) + for i := range errors { + assert.JSONEq(t, expected[i], string(record.xrayCauses[i])) + } + +} + func TestRuntimeAPIContextPlumbing(t *testing.T) { handler := NewHandler(func(ctx context.Context) (interface{}, error) { lc, _ := lambdacontext.FromContext(ctx) @@ -271,6 +312,7 @@ type requestRecord struct { nPosts int responses [][]byte contentTypes []string + xrayCauses []string } type eventMetadata struct { @@ -336,6 +378,7 @@ func runtimeAPIServer(eventPayload string, failAfter int, overrides ...eventMeta w.WriteHeader(http.StatusAccepted) record.responses = append(record.responses, response.Bytes()) record.contentTypes = append(record.contentTypes, r.Header.Get("Content-Type")) + record.xrayCauses = append(record.xrayCauses, r.Header.Get(headerXRayErrorCause)) default: w.WriteHeader(http.StatusBadRequest) } diff --git a/lambda/runtime_api_client.go b/lambda/runtime_api_client.go index 84384c29..298323da 100644 --- a/lambda/runtime_api_client.go +++ b/lambda/runtime_api_client.go @@ -13,6 +13,8 @@ import ( "log" "net/http" "runtime" + + "github.com/aws/aws-lambda-go/lambda/messages" ) const ( @@ -22,11 +24,13 @@ const ( headerCognitoIdentity = "Lambda-Runtime-Cognito-Identity" headerClientContext = "Lambda-Runtime-Client-Context" headerInvokedFunctionARN = "Lambda-Runtime-Invoked-Function-Arn" + headerXRayErrorCause = "Lambda-Runtime-Function-Xray-Error-Cause" trailerLambdaErrorType = "Lambda-Runtime-Function-Error-Type" trailerLambdaErrorBody = "Lambda-Runtime-Function-Error-Body" contentTypeJSON = "application/json" contentTypeBytes = "application/octet-stream" apiVersion = "2018-06-01" + xrayErrorCauseMaxSize = 1024 * 1024 ) type runtimeAPIClient struct { @@ -52,12 +56,18 @@ type invoke struct { client *runtimeAPIClient } +type failure struct { + WorkingDirectory string `json:"working_directory"` + Exceptions []messages.InvokeResponse_Error `json:"exceptions"` + Paths []string `json:"paths"` +} + // success sends the response payload for an in-progress invocation. // Notes: // - An invoke is not complete until next() is called again! func (i *invoke) success(body io.Reader, contentType string) error { url := i.client.baseURL + i.id + "/response" - return i.client.post(url, body, contentType) + return i.client.post(url, body, contentType, nil) } // failure sends the payload to the Runtime API. This marks the function's invoke as a failure. @@ -66,8 +76,12 @@ func (i *invoke) success(body io.Reader, contentType string) error { // - A Lambda Function continues to be re-used for future invokes even after a failure. // If the error is fatal (panic, unrecoverable state), exit the process immediately after calling failure() func (i *invoke) failure(body io.Reader, contentType string) error { + return i.xfailure(body, contentType, nil) +} + +func (i *invoke) xfailure(body io.Reader, contentType string, causeForXRay []byte) error { url := i.client.baseURL + i.id + "/error" - return i.client.post(url, body, contentType) + return i.client.post(url, body, contentType, causeForXRay) } // next connects to the Runtime API and waits for a new invoke Request to be available. @@ -108,7 +122,7 @@ func (c *runtimeAPIClient) next() (*invoke, error) { }, nil } -func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string) error { +func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string, xrayErrorCause []byte) error { b := newErrorCapturingReader(body) req, err := http.NewRequest(http.MethodPost, url, b) if err != nil { @@ -118,6 +132,10 @@ func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string) req.Header.Set("User-Agent", c.userAgent) req.Header.Set("Content-Type", contentType) + if xrayErrorCause != nil && len(xrayErrorCause) < xrayErrorCauseMaxSize { + req.Header.Set(headerXRayErrorCause, string(xrayErrorCause)) + } + resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("failed to POST to %s: %v", url, err) From 55f2e0a2d21ff84ee3570f1f9c1c019f3c08e9fc Mon Sep 17 00:00:00 2001 From: Bryan Moffatt Date: Tue, 23 Jan 2024 16:51:39 -0800 Subject: [PATCH 2/6] cleanup --- lambda/invoke_loop.go | 2 +- lambda/runtime_api_client.go | 14 +------------- lambda/runtime_api_client_test.go | 8 ++++---- 3 files changed, 6 insertions(+), 18 deletions(-) diff --git a/lambda/invoke_loop.go b/lambda/invoke_loop.go index 0d4397a6..a121773d 100644 --- a/lambda/invoke_loop.go +++ b/lambda/invoke_loop.go @@ -111,7 +111,7 @@ func reportFailure(invoke *invoke, invokeErr *messages.InvokeResponse_Error) err return fmt.Errorf("unexpected error occured when serializing the function error cause for X-Ray: %v", err) } - if err := invoke.xfailure(bytes.NewReader(errorPayload), contentTypeJSON, causeForXRay); err != nil { + if err := invoke.failure(bytes.NewReader(errorPayload), contentTypeJSON, causeForXRay); err != nil { return fmt.Errorf("unexpected error occurred when sending the function error to the API: %v", err) } return nil diff --git a/lambda/runtime_api_client.go b/lambda/runtime_api_client.go index 298323da..158bd6b4 100644 --- a/lambda/runtime_api_client.go +++ b/lambda/runtime_api_client.go @@ -13,8 +13,6 @@ import ( "log" "net/http" "runtime" - - "github.com/aws/aws-lambda-go/lambda/messages" ) const ( @@ -56,12 +54,6 @@ type invoke struct { client *runtimeAPIClient } -type failure struct { - WorkingDirectory string `json:"working_directory"` - Exceptions []messages.InvokeResponse_Error `json:"exceptions"` - Paths []string `json:"paths"` -} - // success sends the response payload for an in-progress invocation. // Notes: // - An invoke is not complete until next() is called again! @@ -75,11 +67,7 @@ func (i *invoke) success(body io.Reader, contentType string) error { // - The execution of the function process continues, and is billed, until next() is called again! // - A Lambda Function continues to be re-used for future invokes even after a failure. // If the error is fatal (panic, unrecoverable state), exit the process immediately after calling failure() -func (i *invoke) failure(body io.Reader, contentType string) error { - return i.xfailure(body, contentType, nil) -} - -func (i *invoke) xfailure(body io.Reader, contentType string, causeForXRay []byte) error { +func (i *invoke) failure(body io.Reader, contentType string, causeForXRay []byte) error { url := i.client.baseURL + i.id + "/error" return i.client.post(url, body, contentType, causeForXRay) } diff --git a/lambda/runtime_api_client_test.go b/lambda/runtime_api_client_test.go index 7ccd47fb..3f41403f 100644 --- a/lambda/runtime_api_client_test.go +++ b/lambda/runtime_api_client_test.go @@ -92,7 +92,7 @@ func TestClientDoneAndError(t *testing.T) { assert.NoError(t, err) }) t.Run(fmt.Sprintf("happy Error with payload[%d]", i), func(t *testing.T) { - err := invoke.failure(bytes.NewReader(payload), contentTypeJSON) + err := invoke.failure(bytes.NewReader(payload), contentTypeJSON, nil) assert.NoError(t, err) }) } @@ -105,7 +105,7 @@ func TestInvalidRequestsForMalformedEndpoint(t *testing.T) { require.Error(t, err) err = (&invoke{client: newRuntimeAPIClient("🚨")}).success(nil, "") require.Error(t, err) - err = (&invoke{client: newRuntimeAPIClient("🚨")}).failure(nil, "") + err = (&invoke{client: newRuntimeAPIClient("🚨")}).failure(nil, "", nil) require.Error(t, err) } @@ -145,7 +145,7 @@ func TestStatusCodes(t *testing.T) { require.NoError(t, err) }) t.Run("failure should not error", func(t *testing.T) { - err := invoke.failure(nil, "") + err := invoke.failure(nil, "", nil) require.NoError(t, err) }) } else { @@ -158,7 +158,7 @@ func TestStatusCodes(t *testing.T) { } }) t.Run("failure should error", func(t *testing.T) { - err := invoke.failure(nil, "") + err := invoke.failure(nil, "", nil) require.Error(t, err) if i != 301 && i != 302 && i != 303 { assert.Contains(t, err.Error(), "unexpected status code") From 1aaad14b05939697fa0c678d4c90cdfa92a6005c Mon Sep 17 00:00:00 2001 From: Bryan Moffatt Date: Wed, 24 Jan 2024 09:39:38 -0800 Subject: [PATCH 3/6] fix xrayException shape - the json tags differ from InvokeResponse_Error --- lambda/invoke_loop.go | 29 +++++++++++++++++++---------- lambda/invoke_loop_test.go | 16 +++++++++------- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/lambda/invoke_loop.go b/lambda/invoke_loop.go index a121773d..e558ea45 100644 --- a/lambda/invoke_loop.go +++ b/lambda/invoke_loop.go @@ -102,11 +102,10 @@ func handleInvoke(invoke *invoke, handler *handlerOptions) error { } func reportFailure(invoke *invoke, invokeErr *messages.InvokeResponse_Error) error { - errorForXRay := makeErrorForXRay(invokeErr) - errorPayload := errorForXRay.Exceptions[0] + errorPayload := safeMarshal(invokeErr) log.Printf("%s", errorPayload) - causeForXRay, err := json.Marshal(errorForXRay) + causeForXRay, err := json.Marshal(makeXRayError(invokeErr)) if err != nil { return fmt.Errorf("unexpected error occured when serializing the function error cause for X-Ray: %v", err) } @@ -174,13 +173,19 @@ func safeMarshal(v interface{}) []byte { return payload } -type errorForXRay struct { - WorkingDirectory string `json:"working_directory"` - Exceptions []json.RawMessage `json:"exceptions"` // returned as bytes to avoid double-serializing - Paths []string `json:"paths"` +type xrayException struct { + Type string `json:"type"` + Message string `json:"message"` + Stack []*messages.InvokeResponse_Error_StackFrame `json:"stack"` } -func makeErrorForXRay(invokeResponseError *messages.InvokeResponse_Error) *errorForXRay { +type xrayError struct { + WorkingDirectory string `json:"working_directory"` + Exceptions []xrayException `json:"exceptions"` + Paths []string `json:"paths"` +} + +func makeXRayError(invokeResponseError *messages.InvokeResponse_Error) *xrayError { pathSet := make(map[string]struct{}, len(invokeResponseError.StackTrace)) for _, frame := range invokeResponseError.StackTrace { pathSet[frame.Path] = struct{}{} @@ -190,9 +195,13 @@ func makeErrorForXRay(invokeResponseError *messages.InvokeResponse_Error) *error paths = append(paths, path) } cwd, _ := os.Getwd() - return &errorForXRay{ + return &xrayError{ WorkingDirectory: cwd, Paths: paths, - Exceptions: []json.RawMessage{safeMarshal(invokeResponseError)}, + Exceptions: []xrayException{{ + Type: invokeResponseError.Type, + Message: invokeResponseError.Message, + Stack: invokeResponseError.StackTrace, + }}, } } diff --git a/lambda/invoke_loop_test.go b/lambda/invoke_loop_test.go index ab110c50..a3a2fcd0 100644 --- a/lambda/invoke_loop_test.go +++ b/lambda/invoke_loop_test.go @@ -93,10 +93,11 @@ func TestCustomErrorMarshaling(t *testing.T) { func TestXRayCausePlumbing(t *testing.T) { errors := []error{ messages.InvokeResponse_Error{ - Type: "yolo", - Message: "hello", + Type: "yoloError", + Message: "hello yolo", StackTrace: []*messages.InvokeResponse_Error_StackFrame{ {Label: "yolo", Path: "yolo", Line: 2}, + {Label: "hi", Path: "hello/hello", Line: 12}, }, }, } @@ -104,12 +105,13 @@ func TestXRayCausePlumbing(t *testing.T) { expected := []string{ `{ "working_directory":"` + wd + `", - "paths": ["yolo"], + "paths": ["yolo", "hello/hello"], "exceptions": [{ - "errorType": "yolo", - "errorMessage": "hello", - "stackTrace": [ - {"label": "yolo", "path": "yolo", "line": 2} + "type": "yoloError", + "message": "hello yolo", + "stack": [ + {"label": "yolo", "path": "yolo", "line": 2}, + {"label": "hi", "path": "hello/hello", "line": 12} ] }] }`, From b96de21b7103e467eb00049e7c5476247651ea39 Mon Sep 17 00:00:00 2001 From: Bryan Moffatt Date: Wed, 24 Jan 2024 10:00:31 -0800 Subject: [PATCH 4/6] no stack should be empty list instead of nil --- lambda/invoke_loop.go | 14 +++++++++----- lambda/invoke_loop_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/lambda/invoke_loop.go b/lambda/invoke_loop.go index e558ea45..ab39147e 100644 --- a/lambda/invoke_loop.go +++ b/lambda/invoke_loop.go @@ -195,13 +195,17 @@ func makeXRayError(invokeResponseError *messages.InvokeResponse_Error) *xrayErro paths = append(paths, path) } cwd, _ := os.Getwd() + exceptions := []xrayException{{ + Type: invokeResponseError.Type, + Message: invokeResponseError.Message, + Stack: invokeResponseError.StackTrace, + }} + if exceptions[0].Stack == nil { + exceptions[0].Stack = []*messages.InvokeResponse_Error_StackFrame{} + } return &xrayError{ WorkingDirectory: cwd, Paths: paths, - Exceptions: []xrayException{{ - Type: invokeResponseError.Type, - Message: invokeResponseError.Message, - Stack: invokeResponseError.StackTrace, - }}, + Exceptions: exceptions, } } diff --git a/lambda/invoke_loop_test.go b/lambda/invoke_loop_test.go index a3a2fcd0..f0e236b9 100644 --- a/lambda/invoke_loop_test.go +++ b/lambda/invoke_loop_test.go @@ -92,6 +92,7 @@ func TestCustomErrorMarshaling(t *testing.T) { func TestXRayCausePlumbing(t *testing.T) { errors := []error{ + errors.New("barf"), messages.InvokeResponse_Error{ Type: "yoloError", Message: "hello yolo", @@ -100,9 +101,24 @@ func TestXRayCausePlumbing(t *testing.T) { {Label: "hi", Path: "hello/hello", Line: 12}, }, }, + messages.InvokeResponse_Error{ + Type: "yoloError", + Message: "hello yolo", + StackTrace: []*messages.InvokeResponse_Error_StackFrame{ + }, + }, } wd, _ := os.Getwd() expected := []string{ + `{ + "working_directory":"` + wd + `", + "paths": [], + "exceptions": [{ + "type": "errorString", + "message": "barf", + "stack": [] + }] + }`, `{ "working_directory":"` + wd + `", "paths": ["yolo", "hello/hello"], @@ -115,6 +131,17 @@ func TestXRayCausePlumbing(t *testing.T) { ] }] }`, + `{ + "working_directory":"` + wd + `", + "paths": [], + "exceptions": [{ + "type": "yoloError", + "message": "hello yolo", + "stack": [ + ] + }] + }`, + } require.Equal(t, len(errors), len(expected)) ts, record := runtimeAPIServer(``, len(errors)) From 9f77144be7536bf9f2c8e51f4a51dc67949ff9d2 Mon Sep 17 00:00:00 2001 From: Bryan Moffatt Date: Wed, 24 Jan 2024 10:03:57 -0800 Subject: [PATCH 5/6] gofmt --- lambda/invoke_loop_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/lambda/invoke_loop_test.go b/lambda/invoke_loop_test.go index f0e236b9..0ce0db0c 100644 --- a/lambda/invoke_loop_test.go +++ b/lambda/invoke_loop_test.go @@ -102,15 +102,14 @@ func TestXRayCausePlumbing(t *testing.T) { }, }, messages.InvokeResponse_Error{ - Type: "yoloError", - Message: "hello yolo", - StackTrace: []*messages.InvokeResponse_Error_StackFrame{ - }, + Type: "yoloError", + Message: "hello yolo", + StackTrace: []*messages.InvokeResponse_Error_StackFrame{}, }, } wd, _ := os.Getwd() expected := []string{ - `{ + `{ "working_directory":"` + wd + `", "paths": [], "exceptions": [{ @@ -141,7 +140,6 @@ func TestXRayCausePlumbing(t *testing.T) { ] }] }`, - } require.Equal(t, len(errors), len(expected)) ts, record := runtimeAPIServer(``, len(errors)) From b8a6cfc4b41b9b3d573ab672929bbdbede4d95f7 Mon Sep 17 00:00:00 2001 From: Bryan Moffatt Date: Wed, 24 Jan 2024 10:38:01 -0800 Subject: [PATCH 6/6] stable order of the paths array --- lambda/invoke_loop.go | 13 +++++++------ lambda/invoke_loop_test.go | 38 +++++++++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/lambda/invoke_loop.go b/lambda/invoke_loop.go index ab39147e..338237ea 100644 --- a/lambda/invoke_loop.go +++ b/lambda/invoke_loop.go @@ -186,14 +186,15 @@ type xrayError struct { } func makeXRayError(invokeResponseError *messages.InvokeResponse_Error) *xrayError { - pathSet := make(map[string]struct{}, len(invokeResponseError.StackTrace)) + paths := make([]string, 0, len(invokeResponseError.StackTrace)) + visitedPaths := make(map[string]struct{}, len(invokeResponseError.StackTrace)) for _, frame := range invokeResponseError.StackTrace { - pathSet[frame.Path] = struct{}{} - } - paths := make([]string, 0, len(pathSet)) - for path := range pathSet { - paths = append(paths, path) + if _, exists := visitedPaths[frame.Path]; !exists { + visitedPaths[frame.Path] = struct{}{} + paths = append(paths, frame.Path) + } } + cwd, _ := os.Getwd() exceptions := []xrayException{{ Type: invokeResponseError.Type, diff --git a/lambda/invoke_loop_test.go b/lambda/invoke_loop_test.go index 0ce0db0c..3374dc2a 100644 --- a/lambda/invoke_loop_test.go +++ b/lambda/invoke_loop_test.go @@ -101,11 +101,25 @@ func TestXRayCausePlumbing(t *testing.T) { {Label: "hi", Path: "hello/hello", Line: 12}, }, }, + messages.InvokeResponse_Error{ + Type: "yoloError", + Message: "hello yolo", + StackTrace: []*messages.InvokeResponse_Error_StackFrame{ + {Label: "hi", Path: "hello/hello", Line: 12}, + {Label: "hihi", Path: "hello/hello", Line: 13}, + {Label: "yolo", Path: "yolo", Line: 2}, + {Label: "hi", Path: "hello/hello", Line: 14}, + }, + }, messages.InvokeResponse_Error{ Type: "yoloError", Message: "hello yolo", StackTrace: []*messages.InvokeResponse_Error_StackFrame{}, }, + messages.InvokeResponse_Error{ + Type: "yoloError", + Message: "hello yolo", + }, } wd, _ := os.Getwd() expected := []string{ @@ -132,14 +146,36 @@ func TestXRayCausePlumbing(t *testing.T) { }`, `{ "working_directory":"` + wd + `", - "paths": [], + "paths": ["hello/hello", "yolo"], "exceptions": [{ "type": "yoloError", "message": "hello yolo", "stack": [ + {"label": "hi", "path": "hello/hello", "line": 12}, + {"label": "hihi", "path": "hello/hello", "line": 13}, + {"label": "yolo", "path": "yolo", "line": 2}, + {"label": "hi", "path": "hello/hello", "line": 14} ] }] }`, + `{ + "working_directory":"` + wd + `", + "paths": [], + "exceptions": [{ + "type": "yoloError", + "message": "hello yolo", + "stack": [] + }] + }`, + `{ + "working_directory":"` + wd + `", + "paths": [], + "exceptions": [{ + "type": "yoloError", + "message": "hello yolo", + "stack": [] + }] + }`, } require.Equal(t, len(errors), len(expected)) ts, record := runtimeAPIServer(``, len(errors))