diff --git a/test/.gitignore b/example/.gitignore similarity index 100% rename from test/.gitignore rename to example/.gitignore diff --git a/example/checkout.go b/example/checkout.go new file mode 100644 index 0000000..4caed62 --- /dev/null +++ b/example/checkout.go @@ -0,0 +1,55 @@ +package main + +import ( + "fmt" + + "github.com/google/uuid" + "github.com/muhamadazmy/restate-sdk-go" + "github.com/rs/zerolog/log" +) + +type PaymentRequest struct { + UserID string + Tickets []string +} + +func payment(ctx restate.Context, request PaymentRequest) (bool, error) { + uuid, err := restate.SideEffectAs(ctx, func() (string, error) { + uuid := uuid.New() + return uuid.String(), nil + }) + + if err != nil { + return false, err + } + + // We are a uniform shop where everything costs 30 USD + // that is cheaper than the official example :P + price := len(request.Tickets) * 30 + + i := 0 + success, err := restate.SideEffectAs(ctx, func() (bool, error) { + log := log.With().Str("uuid", uuid).Int("price", price).Logger() + if i > 2 { + log.Info().Msg("payment succeeded") + return true, nil + } + + log.Error().Msg("payment failed") + i += 1 + return false, fmt.Errorf("failed to pay") + }) + + if err != nil { + return false, err + } + + // todo: send email + + return success, nil +} + +var ( + Checkout = restate.NewUnKeyedRouter(). + Handler("checkout", restate.NewUnKeyedHandler(payment)) +) diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..b1753f0 --- /dev/null +++ b/example/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "context" + "os" + + "github.com/muhamadazmy/restate-sdk-go/server" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +const ( + UserSessionServiceName = "UserSession" + TicketServiceName = "TicketService" + CheckoutServiceName = "Checkout" +) + +func main() { + + zerolog.SetGlobalLevel(zerolog.DebugLevel) + + server := server.NewRestate(). + Bind(UserSessionServiceName, UserSession). + Bind(TicketServiceName, TicketService). + Bind(CheckoutServiceName, Checkout) + + if err := server.Start(context.Background(), ":9080"); err != nil { + log.Error().Err(err).Msg("application exited unexpectedly") + os.Exit(1) + } +} diff --git a/example/ticket_service.go b/example/ticket_service.go new file mode 100644 index 0000000..3b99a29 --- /dev/null +++ b/example/ticket_service.go @@ -0,0 +1,61 @@ +package main + +import ( + "errors" + + "github.com/muhamadazmy/restate-sdk-go" +) + +type TicketStatus int + +const ( + TicketAvailable TicketStatus = 0 + TicketReserved TicketStatus = 1 + TicketSold TicketStatus = 2 +) + +func reserve(ctx restate.Context, _ string, _ restate.Void) (bool, error) { + status, err := restate.GetAs[TicketStatus](ctx, "status") + if err != nil && !errors.Is(err, restate.ErrKeyNotFound) { + return false, err + } + + if status == TicketAvailable { + return true, restate.SetAs(ctx, "status", TicketReserved) + } + + return false, nil +} + +func unreserve(ctx restate.Context, _ string, _ restate.Void) (void restate.Void, err error) { + status, err := restate.GetAs[TicketStatus](ctx, "status") + if err != nil && !errors.Is(err, restate.ErrKeyNotFound) { + return void, err + } + + if status != TicketSold { + return void, ctx.Clear("status") + } + + return void, nil +} + +func markAsSold(ctx restate.Context, _ string, _ restate.Void) (void restate.Void, err error) { + status, err := restate.GetAs[TicketStatus](ctx, "status") + if err != nil && !errors.Is(err, restate.ErrKeyNotFound) { + return void, err + } + + if status == TicketReserved { + return void, restate.SetAs(ctx, "status", TicketSold) + } + + return void, nil +} + +var ( + TicketService = restate.NewKeyedRouter(). + Handler("reserve", restate.NewKeyedHandler(reserve)). + Handler("unreserve", restate.NewKeyedHandler(unreserve)). + Handler("markAsSold", restate.NewKeyedHandler(markAsSold)) +) diff --git a/example/user_session.go b/example/user_session.go new file mode 100644 index 0000000..8368f31 --- /dev/null +++ b/example/user_session.go @@ -0,0 +1,103 @@ +package main + +import ( + "errors" + "slices" + "time" + + "github.com/muhamadazmy/restate-sdk-go" +) + +func addTicket(ctx restate.Context, userId, ticketId string) (bool, error) { + + var success bool + if err := ctx.Service(TicketServiceName).Method("reserve").Do(ticketId, userId, &success); err != nil { + return false, err + } + + if !success { + return false, nil + } + + // add ticket to list of tickets + tickets, err := restate.GetAs[[]string](ctx, "tickets") + + if err != nil && errors.Is(err, restate.ErrKeyNotFound) { + return false, err + } + + tickets = append(tickets, ticketId) + + if err := restate.SetAs(ctx, "tickets", tickets); err != nil { + return false, err + } + + if err := ctx.Service(UserSessionServiceName).Method("expireTicket").Send(userId, ticketId, 15*time.Minute); err != nil { + return false, err + } + + return true, nil +} + +func expireTicket(ctx restate.Context, _, ticketId string) (void restate.Void, err error) { + tickets, err := restate.GetAs[[]string](ctx, "tickets") + if err != nil && !errors.Is(err, restate.ErrKeyNotFound) { + return void, err + } + + deleted := false + tickets = slices.DeleteFunc(tickets, func(ticket string) bool { + if ticket == ticketId { + deleted = true + return true + } + return false + }) + if !deleted { + return void, nil + } + + if err := restate.SetAs(ctx, "tickets", tickets); err != nil { + return void, err + } + + return void, ctx.Service(TicketServiceName).Method("unreserve").Send(ticketId, nil, 0) +} + +func checkout(ctx restate.Context, userId string, _ restate.Void) (bool, error) { + tickets, err := restate.GetAs[[]string](ctx, "tickets") + if err != nil && !errors.Is(err, restate.ErrKeyNotFound) { + return false, err + } + + if len(tickets) == 0 { + return false, nil + } + var success bool + + if err := ctx.Service(CheckoutServiceName). + Method("checkout"). + Do("", PaymentRequest{UserID: userId, Tickets: tickets}, &success); err != nil { + return false, err + } + + if !success { + return false, nil + } + + call := ctx.Service(TicketServiceName).Method("markAsSold") + for _, ticket := range tickets { + if err := call.Send(ticket, nil, 0); err != nil { + return false, err + } + } + + return true, ctx.Clear("tickets") +} + +var ( + UserSession = restate.NewKeyedRouter(). + Handler("addTicket", restate.NewKeyedHandler(addTicket)). + Handler("expireTicket", restate.NewKeyedHandler(expireTicket)). + Handler("checkout", restate.NewKeyedHandler(checkout)) +) diff --git a/go.mod b/go.mod index aa68cdf..289acd7 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 106e9c9..deb27be 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= diff --git a/internal/state/call.go b/internal/state/call.go index 8e9eacb..c452980 100644 --- a/internal/state/call.go +++ b/internal/state/call.go @@ -41,8 +41,8 @@ type serviceCall struct { } // Do makes a call and wait for the response -func (c *serviceCall) Do(key string, body any) ([]byte, error) { - return c.machine.doCall(c.service, c.method, key, body) +func (c *serviceCall) Do(key string, input any, output any) error { + return c.machine.doCall(c.service, c.method, key, input, output) } // Send runs a call in the background after delay duration @@ -69,13 +69,13 @@ func (c *Machine) makeRequest(key string, body any) ([]byte, error) { return proto.Marshal(params) } -func (c *Machine) doCall(service, method, key string, body any) ([]byte, error) { - params, err := c.makeRequest(key, body) +func (c *Machine) doCall(service, method, key string, input, output any) error { + params, err := c.makeRequest(key, input) if err != nil { - return nil, err + return err } - return replayOrNew( + bytes, err := replayOrNew( c, wire.InvokeEntryMessageType, func(entry *wire.InvokeEntryMessage) ([]byte, error) { @@ -96,6 +96,20 @@ func (c *Machine) doCall(service, method, key string, body any) ([]byte, error) }, func() ([]byte, error) { return c._doCall(service, method, params) }) + + if err != nil { + return err + } + + if output == nil { + return nil + } + + if err := json.Unmarshal(bytes, output); err != nil { + return restate.TerminalError(fmt.Errorf("failed to decode response: %w", err)) + } + + return nil } func (c *Machine) _doCall(service, method string, params []byte) ([]byte, error) { diff --git a/router.go b/router.go index ab9b94f..da71610 100644 --- a/router.go +++ b/router.go @@ -26,8 +26,8 @@ var ( type Call interface { // Do makes a call and wait for the response - Do(key string, body any) ([]byte, error) - // Send runs a call in the background after delay duration + Do(key string, input any, output any) error + // Send makes a call in the background (doesn't wait for response) after delay duration Send(key string, body any, delay time.Duration) error } diff --git a/test/main.go b/test/main.go deleted file mode 100644 index c8a5331..0000000 --- a/test/main.go +++ /dev/null @@ -1,123 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "math/rand" - "time" - - "github.com/muhamadazmy/restate-sdk-go" - "github.com/muhamadazmy/restate-sdk-go/server" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" -) - -type J = map[string]interface{} - -type Tickets struct{} - -func (t *Tickets) Reserve(ctx restate.Context, id string, _ restate.Void) (string, error) { - - random, err := restate.SideEffectAs(ctx, func() (float64, error) { - return rand.Float64(), nil - }) - - log.Info().Float64("rand", random).Msg("your random is") - - if err != nil { - return "", err - } - - return fmt.Sprintf("your random number is: %f", random), nil -} - -func (t *Tickets) UnReserve(ctx restate.Context, id string, _ restate.Void) (restate.Void, error) { - if err := ctx.Clear("reserved"); err != nil { - return restate.Void{}, err - } - - log.Info().Msg("tick unreserved") - - return restate.Void{}, nil -} - -func Echo(ctx restate.Context, name string) (string, error) { - response, err := ctx.Service("Keyed").Method("SayHi").Do(name, J{}) - if err != nil { - return "", err - } - - return fmt.Sprintf("echo: %s", string(response)), nil -} - -func SayHi(ctx restate.Context, key string, _ restate.Void) (string, error) { - data, err := ctx.Get("count") - if err != nil { - return "", err - } - - var count uint64 - if data != nil { - if err := json.Unmarshal(data, &count); err != nil { - return "", err - } - } - - if count > 5 { - ctx.Clear("count") - return "flushed", nil - } - count += 1 - if err := ctx.Set("count", []byte(fmt.Sprint(count))); err != nil { - return "", err - } - - return fmt.Sprintf("Hi: %s (%d)", key, count), nil -} - -func Keys(ctx restate.Context, key string, _ restate.Void) (restate.Void, error) { - - for i := 0; i < 100; i++ { - ctx.Set(fmt.Sprintf("key.%d", i), []byte("value")) - } - - return restate.Void{}, nil -} - -func Sleep(ctx restate.Context, seconds uint64) (restate.Void, error) { - log.Info().Uint64("seconds", seconds).Msg("sleeping for") - - return restate.Void{}, ctx.Sleep(time.Now().Add(time.Duration(seconds) * time.Second)) -} - -func main() { - - zerolog.SetGlobalLevel(zerolog.DebugLevel) - - var tickets Tickets - - r := server.NewRestate() - - ticketsService := restate.NewKeyedRouter(). - Handler("Reserve", restate.NewKeyedHandler(tickets.Reserve)). - Handler("UnReserve", restate.NewKeyedHandler(tickets.UnReserve)) - - r.Bind("Tickets", ticketsService) - - unKeyed := restate.NewUnKeyedRouter(). - Handler("Echo", restate.NewUnKeyedHandler(Echo)). - Handler("Sleep", restate.NewUnKeyedHandler(Sleep)) - - keyed := restate.NewKeyedRouter(). - Handler("SayHi", restate.NewKeyedHandler(SayHi)). - Handler("Keys", restate.NewKeyedHandler(Keys)) - - r. - Bind("UnKeyed", unKeyed). - Bind("Keyed", keyed) - - if err := r.Start(context.Background(), ":9080"); err != nil { - panic(err) - } -}