From fdd4717450b042271b738f25db6c679e9fb75202 Mon Sep 17 00:00:00 2001 From: Muhamad Azamy Date: Sun, 17 Mar 2024 12:14:08 +0100 Subject: [PATCH] implement side effects Also implement helper accessor functions --- error.go | 8 + generated/proto/javascript/javascript.pb.go | 373 ++++++++++++++++++++ go.mod | 3 + go.sum | 6 + internal/state/state.go | 14 + internal/state/sys.go | 65 ++++ internal/wire/wire.go | 18 + proto/javascript/javascript.proto | 37 ++ router.go | 97 ++++- test/main.go | 29 +- 10 files changed, 622 insertions(+), 28 deletions(-) create mode 100644 generated/proto/javascript/javascript.pb.go create mode 100644 proto/javascript/javascript.proto diff --git a/error.go b/error.go index a280ada..eb3a395 100644 --- a/error.go +++ b/error.go @@ -171,6 +171,10 @@ func (e *terminalError) Unwrap() error { // WithErrorCode returns an error with specific func WithErrorCode(err error, code Code) error { + if err == nil { + return nil + } + return &codeError{ inner: err, code: code, @@ -180,6 +184,10 @@ func WithErrorCode(err error, code Code) error { // TerminalError returns a terminal error with optional code. // code is optional but only one code is allowed. func TerminalError(err error, code ...Code) error { + if err == nil { + return nil + } + if len(code) > 1 { panic("only single code is allowed") } diff --git a/generated/proto/javascript/javascript.pb.go b/generated/proto/javascript/javascript.pb.go new file mode 100644 index 0000000..99c5ee4 --- /dev/null +++ b/generated/proto/javascript/javascript.pb.go @@ -0,0 +1,373 @@ +// +// Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK for Node.js/TypeScript, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc (unknown) +// source: proto/javascript/javascript.proto + +package javascript + +import ( + protocol "github.com/muhamadazmy/restate-sdk-go/generated/proto/protocol" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type FailureWithTerminal struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Failure *protocol.Failure `protobuf:"bytes,1,opt,name=failure,proto3" json:"failure,omitempty"` + Terminal bool `protobuf:"varint,2,opt,name=terminal,proto3" json:"terminal,omitempty"` +} + +func (x *FailureWithTerminal) Reset() { + *x = FailureWithTerminal{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_javascript_javascript_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FailureWithTerminal) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FailureWithTerminal) ProtoMessage() {} + +func (x *FailureWithTerminal) ProtoReflect() protoreflect.Message { + mi := &file_proto_javascript_javascript_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FailureWithTerminal.ProtoReflect.Descriptor instead. +func (*FailureWithTerminal) Descriptor() ([]byte, []int) { + return file_proto_javascript_javascript_proto_rawDescGZIP(), []int{0} +} + +func (x *FailureWithTerminal) GetFailure() *protocol.Failure { + if x != nil { + return x.Failure + } + return nil +} + +func (x *FailureWithTerminal) GetTerminal() bool { + if x != nil { + return x.Terminal + } + return false +} + +// Type: 0xFC00 + 1 +// Flag: RequiresRuntimeAck +type SideEffectEntryMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Result: + // + // *SideEffectEntryMessage_Value + // *SideEffectEntryMessage_Failure + Result isSideEffectEntryMessage_Result `protobuf_oneof:"result"` +} + +func (x *SideEffectEntryMessage) Reset() { + *x = SideEffectEntryMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_javascript_javascript_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SideEffectEntryMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SideEffectEntryMessage) ProtoMessage() {} + +func (x *SideEffectEntryMessage) ProtoReflect() protoreflect.Message { + mi := &file_proto_javascript_javascript_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SideEffectEntryMessage.ProtoReflect.Descriptor instead. +func (*SideEffectEntryMessage) Descriptor() ([]byte, []int) { + return file_proto_javascript_javascript_proto_rawDescGZIP(), []int{1} +} + +func (m *SideEffectEntryMessage) GetResult() isSideEffectEntryMessage_Result { + if m != nil { + return m.Result + } + return nil +} + +func (x *SideEffectEntryMessage) GetValue() []byte { + if x, ok := x.GetResult().(*SideEffectEntryMessage_Value); ok { + return x.Value + } + return nil +} + +func (x *SideEffectEntryMessage) GetFailure() *FailureWithTerminal { + if x, ok := x.GetResult().(*SideEffectEntryMessage_Failure); ok { + return x.Failure + } + return nil +} + +type isSideEffectEntryMessage_Result interface { + isSideEffectEntryMessage_Result() +} + +type SideEffectEntryMessage_Value struct { + Value []byte `protobuf:"bytes,14,opt,name=value,proto3,oneof"` +} + +type SideEffectEntryMessage_Failure struct { + Failure *FailureWithTerminal `protobuf:"bytes,15,opt,name=failure,proto3,oneof"` +} + +func (*SideEffectEntryMessage_Value) isSideEffectEntryMessage_Result() {} + +func (*SideEffectEntryMessage_Failure) isSideEffectEntryMessage_Result() {} + +// Type: 0xFC00 + 2 +type CombinatorEntryMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CombinatorId int32 `protobuf:"varint,1,opt,name=combinator_id,json=combinatorId,proto3" json:"combinator_id,omitempty"` + JournalEntriesOrder []int32 `protobuf:"varint,2,rep,packed,name=journal_entries_order,json=journalEntriesOrder,proto3" json:"journal_entries_order,omitempty"` +} + +func (x *CombinatorEntryMessage) Reset() { + *x = CombinatorEntryMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_javascript_javascript_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CombinatorEntryMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CombinatorEntryMessage) ProtoMessage() {} + +func (x *CombinatorEntryMessage) ProtoReflect() protoreflect.Message { + mi := &file_proto_javascript_javascript_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CombinatorEntryMessage.ProtoReflect.Descriptor instead. +func (*CombinatorEntryMessage) Descriptor() ([]byte, []int) { + return file_proto_javascript_javascript_proto_rawDescGZIP(), []int{2} +} + +func (x *CombinatorEntryMessage) GetCombinatorId() int32 { + if x != nil { + return x.CombinatorId + } + return 0 +} + +func (x *CombinatorEntryMessage) GetJournalEntriesOrder() []int32 { + if x != nil { + return x.JournalEntriesOrder + } + return nil +} + +var File_proto_javascript_javascript_proto protoreflect.FileDescriptor + +var file_proto_javascript_javascript_proto_rawDesc = []byte{ + 0x0a, 0x21, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6a, 0x61, 0x76, 0x61, 0x73, 0x63, 0x72, 0x69, + 0x70, 0x74, 0x2f, 0x6a, 0x61, 0x76, 0x61, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x1a, 0x64, 0x65, 0x76, 0x2e, 0x72, 0x65, 0x73, 0x74, 0x61, 0x74, 0x65, + 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x6a, 0x61, 0x76, 0x61, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x1a, + 0x1d, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x72, + 0x0a, 0x13, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x57, 0x69, 0x74, 0x68, 0x54, 0x65, 0x72, + 0x6d, 0x69, 0x6e, 0x61, 0x6c, 0x12, 0x3f, 0x0a, 0x07, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x72, 0x65, 0x73, + 0x74, 0x61, 0x74, 0x65, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x52, 0x07, 0x66, + 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, + 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, + 0x61, 0x6c, 0x22, 0x87, 0x01, 0x0a, 0x16, 0x53, 0x69, 0x64, 0x65, 0x45, 0x66, 0x66, 0x65, 0x63, + 0x74, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x16, 0x0a, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x4b, 0x0a, 0x07, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, + 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x72, 0x65, 0x73, + 0x74, 0x61, 0x74, 0x65, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x6a, 0x61, 0x76, 0x61, 0x73, 0x63, 0x72, + 0x69, 0x70, 0x74, 0x2e, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x57, 0x69, 0x74, 0x68, 0x54, + 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x6c, 0x48, 0x00, 0x52, 0x07, 0x66, 0x61, 0x69, 0x6c, 0x75, + 0x72, 0x65, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x71, 0x0a, 0x16, + 0x43, 0x6f, 0x6d, 0x62, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, 0x6d, 0x62, 0x69, 0x6e, + 0x61, 0x74, 0x6f, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x63, + 0x6f, 0x6d, 0x62, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x12, 0x32, 0x0a, 0x15, 0x6a, + 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x5f, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x5f, 0x6f, + 0x72, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x03, 0x28, 0x05, 0x52, 0x13, 0x6a, 0x6f, 0x75, 0x72, + 0x6e, 0x61, 0x6c, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x42, + 0xff, 0x01, 0x0a, 0x1e, 0x63, 0x6f, 0x6d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x72, 0x65, 0x73, 0x74, + 0x61, 0x74, 0x65, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x6a, 0x61, 0x76, 0x61, 0x73, 0x63, 0x72, 0x69, + 0x70, 0x74, 0x42, 0x0f, 0x4a, 0x61, 0x76, 0x61, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x50, 0x72, + 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x6d, 0x75, 0x68, 0x61, 0x6d, 0x61, 0x64, 0x61, 0x7a, 0x6d, 0x79, 0x2f, 0x72, 0x65, + 0x73, 0x74, 0x61, 0x74, 0x65, 0x2d, 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x67, 0x65, 0x6e, + 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6a, 0x61, 0x76, + 0x61, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0xa2, 0x02, 0x04, 0x44, 0x52, 0x53, 0x4a, 0xaa, 0x02, + 0x1a, 0x44, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x64, 0x6b, + 0x2e, 0x4a, 0x61, 0x76, 0x61, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0xca, 0x02, 0x1a, 0x44, 0x65, + 0x76, 0x5c, 0x52, 0x65, 0x73, 0x74, 0x61, 0x74, 0x65, 0x5c, 0x53, 0x64, 0x6b, 0x5c, 0x4a, 0x61, + 0x76, 0x61, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0xe2, 0x02, 0x26, 0x44, 0x65, 0x76, 0x5c, 0x52, + 0x65, 0x73, 0x74, 0x61, 0x74, 0x65, 0x5c, 0x53, 0x64, 0x6b, 0x5c, 0x4a, 0x61, 0x76, 0x61, 0x73, + 0x63, 0x72, 0x69, 0x70, 0x74, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0xea, 0x02, 0x1d, 0x44, 0x65, 0x76, 0x3a, 0x3a, 0x52, 0x65, 0x73, 0x74, 0x61, 0x74, 0x65, + 0x3a, 0x3a, 0x53, 0x64, 0x6b, 0x3a, 0x3a, 0x4a, 0x61, 0x76, 0x61, 0x73, 0x63, 0x72, 0x69, 0x70, + 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_javascript_javascript_proto_rawDescOnce sync.Once + file_proto_javascript_javascript_proto_rawDescData = file_proto_javascript_javascript_proto_rawDesc +) + +func file_proto_javascript_javascript_proto_rawDescGZIP() []byte { + file_proto_javascript_javascript_proto_rawDescOnce.Do(func() { + file_proto_javascript_javascript_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_javascript_javascript_proto_rawDescData) + }) + return file_proto_javascript_javascript_proto_rawDescData +} + +var file_proto_javascript_javascript_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_proto_javascript_javascript_proto_goTypes = []interface{}{ + (*FailureWithTerminal)(nil), // 0: dev.restate.sdk.javascript.FailureWithTerminal + (*SideEffectEntryMessage)(nil), // 1: dev.restate.sdk.javascript.SideEffectEntryMessage + (*CombinatorEntryMessage)(nil), // 2: dev.restate.sdk.javascript.CombinatorEntryMessage + (*protocol.Failure)(nil), // 3: dev.restate.service.protocol.Failure +} +var file_proto_javascript_javascript_proto_depIdxs = []int32{ + 3, // 0: dev.restate.sdk.javascript.FailureWithTerminal.failure:type_name -> dev.restate.service.protocol.Failure + 0, // 1: dev.restate.sdk.javascript.SideEffectEntryMessage.failure:type_name -> dev.restate.sdk.javascript.FailureWithTerminal + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_proto_javascript_javascript_proto_init() } +func file_proto_javascript_javascript_proto_init() { + if File_proto_javascript_javascript_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_javascript_javascript_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FailureWithTerminal); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_javascript_javascript_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SideEffectEntryMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_javascript_javascript_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CombinatorEntryMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_proto_javascript_javascript_proto_msgTypes[1].OneofWrappers = []interface{}{ + (*SideEffectEntryMessage_Value)(nil), + (*SideEffectEntryMessage_Failure)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_javascript_javascript_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_javascript_javascript_proto_goTypes, + DependencyIndexes: file_proto_javascript_javascript_proto_depIdxs, + MessageInfos: file_proto_javascript_javascript_proto_msgTypes, + }.Build() + File_proto_javascript_javascript_proto = out.File + file_proto_javascript_javascript_proto_rawDesc = nil + file_proto_javascript_javascript_proto_goTypes = nil + file_proto_javascript_javascript_proto_depIdxs = nil +} diff --git a/go.mod b/go.mod index fa81076..aa68cdf 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,15 @@ require ( ) require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/posener/h2conn v0.0.0-20231204025407-3997deeca0f0 // indirect github.com/stretchr/testify v1.9.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/sys v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 2787314..106e9c9 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -17,6 +19,10 @@ github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/state/state.go b/internal/state/state.go index 08abd51..fcff304 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/cenkalti/backoff/v4" "github.com/muhamadazmy/restate-sdk-go" "github.com/muhamadazmy/restate-sdk-go/generated/proto/dynrpc" "github.com/muhamadazmy/restate-sdk-go/generated/proto/protocol" @@ -74,6 +75,19 @@ func (c *Context) Service(service string) restate.Service { } } +func (c *Context) SideEffect(fn func() ([]byte, error), bo ...backoff.BackOff) ([]byte, error) { + var back backoff.BackOff + if len(bo) == 0 { + back = &restate.DefaultBackoffPolicy + } else if len(bo) == 1 { + back = bo[0] + } else { + panic("only single backoff policy is allowed") + } + + return c.machine.sideEffect(fn, back) +} + func newContext(inner context.Context, machine *Machine) *Context { // state := make(map[string][]byte) diff --git a/internal/state/sys.go b/internal/state/sys.go index e64543c..ce1831c 100644 --- a/internal/state/sys.go +++ b/internal/state/sys.go @@ -5,7 +5,9 @@ import ( "fmt" "time" + "github.com/cenkalti/backoff/v4" "github.com/muhamadazmy/restate-sdk-go" + "github.com/muhamadazmy/restate-sdk-go/generated/proto/javascript" "github.com/muhamadazmy/restate-sdk-go/generated/proto/protocol" "github.com/muhamadazmy/restate-sdk-go/internal/wire" "github.com/rs/zerolog/log" @@ -277,3 +279,66 @@ func (c *Machine) _sleep(until time.Time) error { return nil } + +func (c *Machine) sideEffect(fn func() ([]byte, error), bo backoff.BackOff) ([]byte, error) { + return replayOrNew( + c, + wire.SideEffectEntryMessageType, + func(entry *wire.SideEffectEntryMessage) ([]byte, error) { + switch result := entry.Payload.Result.(type) { + case *javascript.SideEffectEntryMessage_Failure: + err := fmt.Errorf("[%d] %s", result.Failure.Failure.Code, result.Failure.Failure.Message) + if result.Failure.Terminal { + err = restate.TerminalError(err) + } + return nil, err + case *javascript.SideEffectEntryMessage_Value: + return result.Value, nil + } + + return nil, errUnreachable + }, + func() ([]byte, error) { + return c._sideEffect(fn, bo) + }, + ) +} + +func (c *Machine) _sideEffect(fn func() ([]byte, error), bo backoff.BackOff) ([]byte, error) { + var bytes []byte + err := backoff.Retry(func() error { + var err error + bytes, err = fn() + + if restate.IsTerminalError(err) { + // if inner function returned a terminal error + // we need to wrap it in permanent to break + // the retries + return backoff.Permanent(err) + } + return err + }, bo) + + var msg javascript.SideEffectEntryMessage + if err != nil { + msg.Result = &javascript.SideEffectEntryMessage_Failure{ + Failure: &javascript.FailureWithTerminal{ + Failure: &protocol.Failure{ + Code: uint32(restate.ErrorCode(err)), + Message: err.Error(), + }, + Terminal: restate.IsTerminalError(err), + }, + } + } else { + msg.Result = &javascript.SideEffectEntryMessage_Value{ + Value: bytes, + } + } + + if err := c.protocol.Write(&msg); err != nil { + return nil, err + } + + return bytes, err +} diff --git a/internal/wire/wire.go b/internal/wire/wire.go index 408ceed..2a087c7 100644 --- a/internal/wire/wire.go +++ b/internal/wire/wire.go @@ -9,6 +9,7 @@ import ( "io" "math" + "github.com/muhamadazmy/restate-sdk-go/generated/proto/javascript" "github.com/muhamadazmy/restate-sdk-go/generated/proto/protocol" "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" @@ -43,6 +44,9 @@ const ( SleepEntryMessageType Type = 0x0C00 InvokeEntryMessageType Type = 0x0C00 + 1 BackgroundInvokeEntryMessageType Type = 0x0C00 + 2 + + // SideEffect + SideEffectEntryMessageType Type = 0xFC00 + 1 ) type Type uint16 @@ -190,6 +194,8 @@ func (s *Protocol) Write(message proto.Message, flags ...Flag) error { typ = BackgroundInvokeEntryMessageType case *protocol.GetStateKeysEntryMessage: typ = GetStateKeysEntryMessageType + case *javascript.SideEffectEntryMessage: + typ = SideEffectEntryMessageType default: return fmt.Errorf("can not send message of unknown message type") } @@ -310,6 +316,13 @@ var ( Header: header, } + return msg, proto.Unmarshal(bytes, &msg.Payload) + }, + SideEffectEntryMessageType: func(header Header, bytes []byte) (Message, error) { + msg := &SideEffectEntryMessage{ + Header: header, + } + return msg, proto.Unmarshal(bytes, &msg.Payload) }, } @@ -375,3 +388,8 @@ type BackgroundInvokeEntryMessage struct { Header Payload protocol.BackgroundInvokeEntryMessage } + +type SideEffectEntryMessage struct { + Header + Payload javascript.SideEffectEntryMessage +} diff --git a/proto/javascript/javascript.proto b/proto/javascript/javascript.proto new file mode 100644 index 0000000..c78b9d6 --- /dev/null +++ b/proto/javascript/javascript.proto @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate SDK for Node.js/TypeScript, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + */ + +syntax = "proto3"; + +package dev.restate.sdk.javascript; + +import "proto/protocol/protocol.proto"; + +message FailureWithTerminal { + dev.restate.service.protocol.Failure failure = 1; + bool terminal = 2; +} + +// Type: 0xFC00 + 1 +// Flag: RequiresRuntimeAck +message SideEffectEntryMessage { + oneof result { + bytes value = 14; + FailureWithTerminal failure = 15; + }; +} + +// Type: 0xFC00 + 2 +message CombinatorEntryMessage { + int32 combinator_id = 1; + + repeated int32 journal_entries_order = 2; +} diff --git a/router.go b/router.go index 25f4df7..ab9b94f 100644 --- a/router.go +++ b/router.go @@ -2,9 +2,26 @@ package restate import ( "context" + "fmt" "time" + "github.com/cenkalti/backoff/v4" "github.com/muhamadazmy/restate-sdk-go/generated/proto/dynrpc" + "github.com/vmihailenco/msgpack/v5" +) + +var ( + ErrKeyNotFound = fmt.Errorf("key not found") + //DefaultBackoffPolicy is an infinite exponential backoff + DefaultBackoffPolicy = backoff.ExponentialBackOff{ + InitialInterval: 10 * time.Microsecond, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: backoff.DefaultMaxInterval, + MaxElapsedTime: 0, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } ) type Call interface { @@ -15,26 +32,40 @@ type Call interface { } type Service interface { - // Method creates a call to method + // Method creates a call to method with name Method(method string) Call } type Context interface { Ctx() context.Context - // Set stores state value + // Set sets key value to bytes array. You can + // Note: Use SetAs helper function to seamlessly store + // a value of specific type. Set(key string, value []byte) error - // Get a state value associated with key + // Get gets value (bytes array) associated with key + // If key does not exist, this function return a nil bytes array + // and a nil error + // Note: Use GetAs helper function to seamlessly get value + // as specific type. Get(key string) ([]byte, error) // Clear deletes a key Clear(key string) error // ClearAll drops all stored state associated with key ClearAll() error - + // Keys returns a list of all associated key Keys() ([]string, error) - + // Sleep sleep during the execution until time is reached Sleep(until time.Time) error - + // Service gets a Service accessor by name where service + // must be another service known by restate runtime Service(service string) Service + + // SideEffects runs the function (fn) with backoff strategy bo until it succeeds + // or permanently fail. + // this stores the results of the function inside restate runtime so a replay + // will produce the same value (think generating a unique id for example) + // Note: use the SideEffectAs helper function + SideEffect(fn func() ([]byte, error), bo ...backoff.BackOff) ([]byte, error) } // UnKeyedHandlerFn signature of `un-keyed` handler function @@ -99,3 +130,57 @@ func (r *KeyedRouter) Keyed() bool { func (r *KeyedRouter) Handlers() map[string]Handler { return r.handlers } + +// GetAs helper function to get a key as specific type. Note that +// if there is no associated value with key, an error ErrKeyNotFound is +// returned +// it does encoding/decoding of bytes automatically using msgpack +func GetAs[T any](ctx Context, key string) (output T, err error) { + + bytes, err := ctx.Get(key) + if err != nil { + return output, err + } + + if bytes == nil { + // key does not exit. + return output, ErrKeyNotFound + } + + err = msgpack.Unmarshal(bytes, &output) + + return +} + +// SetAs helper function to set a key value with a generic type T. +// it does encoding/decoding of bytes automatically using msgpack +func SetAs[T any](ctx Context, key string, value T) error { + bytes, err := msgpack.Marshal(value) + if err != nil { + return err + } + + return ctx.Set(key, bytes) +} + +// SideEffectAs helper function runs a side effect function with specific concrete type as a result +// it does encoding/decoding of bytes automatically using msgpack +func SideEffectAs[T any](ctx Context, fn func() (T, error), bo ...backoff.BackOff) (output T, err error) { + bytes, err := ctx.SideEffect(func() ([]byte, error) { + out, err := fn() + if err != nil { + return nil, err + } + + bytes, err := msgpack.Marshal(out) + return bytes, TerminalError(err) + }, bo...) + + if err != nil { + return output, err + } + + err = msgpack.Unmarshal(bytes, &output) + + return output, TerminalError(err) +} diff --git a/test/main.go b/test/main.go index 140faab..c8a5331 100644 --- a/test/main.go +++ b/test/main.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math/rand" "time" "github.com/muhamadazmy/restate-sdk-go" @@ -18,33 +19,17 @@ type Tickets struct{} func (t *Tickets) Reserve(ctx restate.Context, id string, _ restate.Void) (string, error) { - fmt.Println(ctx.Keys()) + random, err := restate.SideEffectAs(ctx, func() (float64, error) { + return rand.Float64(), nil + }) - count, err := ctx.Get("reserved") - if err != nil { - return "", err - } - - if len(count) == 0 { - count = make([]byte, 1) - } - count[0] += 1 - if err := ctx.Set("reserved", count); err != nil { - return "", err - } + log.Info().Float64("rand", random).Msg("your random is") - if err := ctx.Set("another key", []byte{}); err != nil { + if err != nil { return "", err } - if err := ctx.Service("Tickets").Method("UnReserve").Send(id, nil, 30*time.Second); err != nil { - return "", fmt.Errorf("failed to schedule 'unreserve': %w", err) - } - - //return "", fmt.Errorf("something went wrong") - // // i wanna return a non terminal error - // //return restate.Void{}, fmt.Errorf("not terminal error") - return fmt.Sprint(count[0]), nil + return fmt.Sprintf("your random number is: %f", random), nil } func (t *Tickets) UnReserve(ctx restate.Context, id string, _ restate.Void) (restate.Void, error) {