diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index b2645b8a3..9ec831d7f 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -395,6 +395,9 @@ func (m *Module) Run(ctx context.Context, request *wasmpb.Request) (*wasmpb.Resp } func containsCode(err error, code int) bool { + if err == nil { + return false + } return strings.Contains(err.Error(), fmt.Sprintf("exit status %d", code)) } diff --git a/pkg/workflows/wasm/runner_test.go b/pkg/workflows/wasm/runner_test.go index 05aacdcda..800960240 100644 --- a/pkg/workflows/wasm/runner_test.go +++ b/pkg/workflows/wasm/runner_test.go @@ -507,6 +507,15 @@ func Test_createEmitFn(t *testing.T) { assert.NoError(t, err) }) + t.Run("success if no labels are given", func(t *testing.T) { + hostEmit := func(respptr, resplenptr, reqptr unsafe.Pointer, reqptrlen int32) int32 { + return 0 + } + runtimeEmit := createEmitFn(sdkConfig, l, hostEmit) + err := runtimeEmit(giveMsg, nil) + assert.NoError(t, err) + }) + t.Run("successfully read error message when emit fails", func(t *testing.T) { hostEmit := func(respptr, resplenptr, reqptr unsafe.Pointer, reqptrlen int32) int32 { // marshall the protobufs diff --git a/pkg/workflows/wasm/runner_wasip1.go b/pkg/workflows/wasm/runner_wasip1.go index c31aa7427..4c772a93a 100644 --- a/pkg/workflows/wasm/runner_wasip1.go +++ b/pkg/workflows/wasm/runner_wasip1.go @@ -58,7 +58,10 @@ func sendResponseFn(response *wasmpb.Response) { os.Exit(CodeInvalidRequest) } - ptr, ptrlen := bufferToPointerLen(pb) + ptr, ptrlen, err := bufferToPointerLen(pb) + if err != nil { + os.Exit(CodeInvalidResponse) + } errno := sendResponse(ptr, ptrlen) if errno != 0 { os.Exit(CodeHostErr) @@ -76,7 +79,10 @@ type wasmWriteSyncer struct{} // Write is used to proxy log requests from the WASM binary back to the host func (wws *wasmWriteSyncer) Write(p []byte) (n int, err error) { - ptr, ptrlen := bufferToPointerLen(p) + ptr, ptrlen, err := bufferToPointerLen(p) + if err != nil { + return int(ptrlen), err + } log(ptr, ptrlen) return int(ptrlen), nil } diff --git a/pkg/workflows/wasm/sdk.go b/pkg/workflows/wasm/sdk.go index f3d6b13ae..7ec9f89ab 100644 --- a/pkg/workflows/wasm/sdk.go +++ b/pkg/workflows/wasm/sdk.go @@ -97,6 +97,9 @@ func createEmitFn( return NewEmissionError(fmt.Errorf("metadata is required to emit")) } + if labels == nil { + labels = map[string]string{} + } labels, err := toEmitLabels(sdkConfig.Metadata, labels) if err != nil { return NewEmissionError(err) @@ -120,14 +123,23 @@ func createEmitFn( // Prepare the request to be sent to the host memory by allocating space for the // response and response length buffers. respBuffer := make([]byte, sdkConfig.MaxFetchResponseSizeBytes) - respptr, _ := bufferToPointerLen(respBuffer) + respptr, _, err := bufferToPointerLen(respBuffer) + if err != nil { + return err + } resplenBuffer := make([]byte, uint32Size) - resplenptr, _ := bufferToPointerLen(resplenBuffer) + resplenptr, _, err := bufferToPointerLen(resplenBuffer) + if err != nil { + return err + } // The request buffer is the wasm memory, get a pointer to the first element and the length // of the protobuf message. - reqptr, reqptrlen := bufferToPointerLen(b) + reqptr, reqptrlen, err := bufferToPointerLen(b) + if err != nil { + return err + } // Emit the message via the method imported from the host errno := emit(respptr, resplenptr, reqptr, reqptrlen) @@ -189,13 +201,22 @@ func createFetchFn( if err != nil { return sdk.FetchResponse{}, fmt.Errorf("failed to marshal fetch request: %w", err) } - reqptr, reqptrlen := bufferToPointerLen(b) + reqptr, reqptrlen, err := bufferToPointerLen(b) + if err != nil { + return sdk.FetchResponse{}, err + } respBuffer := make([]byte, sdkConfig.MaxFetchResponseSizeBytes) - respptr, _ := bufferToPointerLen(respBuffer) + respptr, _, err := bufferToPointerLen(respBuffer) + if err != nil { + return sdk.FetchResponse{}, err + } resplenBuffer := make([]byte, uint32Size) - resplenptr, _ := bufferToPointerLen(resplenBuffer) + resplenptr, _, err := bufferToPointerLen(resplenBuffer) + if err != nil { + return sdk.FetchResponse{}, err + } errno := fetch(respptr, resplenptr, reqptr, reqptrlen) if errno != 0 { @@ -230,8 +251,11 @@ func createFetchFn( } // bufferToPointerLen returns a pointer to the first element of the buffer and the length of the buffer. -func bufferToPointerLen(buf []byte) (unsafe.Pointer, int32) { - return unsafe.Pointer(&buf[0]), int32(len(buf)) +func bufferToPointerLen(buf []byte) (unsafe.Pointer, int32, error) { + if len(buf) == 0 { + return nil, 0, fmt.Errorf("buffer cannot be empty") + } + return unsafe.Pointer(&buf[0]), int32(len(buf)), nil } // toEmitLabels ensures that the required metadata is present in the labels map @@ -248,6 +272,10 @@ func toEmitLabels(md *capabilities.RequestMetadata, labels map[string]string) (m return nil, fmt.Errorf("must provide workflow owner to emit event") } + if md.WorkflowExecutionID == "" { + return nil, fmt.Errorf("must provide workflow execution id to emit event") + } + labels[events.LabelWorkflowExecutionID] = md.WorkflowExecutionID labels[events.LabelWorkflowOwner] = md.WorkflowOwner labels[events.LabelWorkflowID] = md.WorkflowID diff --git a/pkg/workflows/wasm/sdk_test.go b/pkg/workflows/wasm/sdk_test.go index 312dba7c7..9eb478c59 100644 --- a/pkg/workflows/wasm/sdk_test.go +++ b/pkg/workflows/wasm/sdk_test.go @@ -11,9 +11,10 @@ import ( func Test_toEmitLabels(t *testing.T) { t.Run("successfully transforms metadata", func(t *testing.T) { md := &capabilities.RequestMetadata{ - WorkflowID: "workflow-id", - WorkflowName: "workflow-name", - WorkflowOwner: "workflow-owner", + WorkflowID: "workflow-id", + WorkflowName: "workflow-name", + WorkflowOwner: "workflow-owner", + WorkflowExecutionID: "6e2a46e3b6ae611bdb9bcc36ed3f46bb9a30babc3aabdd4eae7f35dd9af0f244", } empty := make(map[string]string, 0) @@ -24,7 +25,7 @@ func Test_toEmitLabels(t *testing.T) { "workflow_id": "workflow-id", "workflow_name": "workflow-name", "workflow_owner_address": "workflow-owner", - "workflow_execution_id": "", + "workflow_execution_id": "6e2a46e3b6ae611bdb9bcc36ed3f46bb9a30babc3aabdd4eae7f35dd9af0f244", }, gotLabels) }) @@ -63,4 +64,25 @@ func Test_toEmitLabels(t *testing.T) { assert.Error(t, err) assert.ErrorContains(t, err, "workflow owner") }) + + t.Run("fails on missing workflow execution id", func(t *testing.T) { + md := &capabilities.RequestMetadata{ + WorkflowID: "workflow-id", + WorkflowName: "workflow-name", + WorkflowOwner: "workflow-owner", + } + empty := make(map[string]string, 0) + + _, err := toEmitLabels(md, empty) + assert.Error(t, err) + assert.ErrorContains(t, err, "workflow execution id") + }) +} + +func Test_bufferToPointerLen(t *testing.T) { + t.Run("fails when no buffer", func(t *testing.T) { + _, _, err := bufferToPointerLen([]byte{}) + assert.Error(t, err) + assert.ErrorContains(t, err, "buffer cannot be empty") + }) }