-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add isWorker flag to actor startup #62
base: master
Are you sure you want to change the base?
Changes from 5 commits
4aac376
0e3cef3
b3eb806
b7e0c05
53e9427
0bcdaa1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,12 @@ | ||
package main | ||
|
||
import ( | ||
"encoding/base64" | ||
"fmt" | ||
"strconv" | ||
|
||
"github.com/buger/jsonparser" | ||
"github.com/richardartoul/nola/virtual/types" | ||
"github.com/richardartoul/nola/wapcutils" | ||
wapc "github.com/wapc/wapc-guest-tinygo" | ||
) | ||
|
@@ -33,12 +37,14 @@ func main() { | |
|
||
var ( | ||
count int64 | ||
instantiatePayload []byte | ||
instantiatePayload types.InstantiatePayload | ||
startupWasCalled = false | ||
shutdownWasCalled = false | ||
) | ||
|
||
// getInstantiatePayload returns the payload provided to the Startup invocation. | ||
func getInstantiatePayload(payload []byte) ([]byte, error) { | ||
return instantiatePayload, nil | ||
return []byte(instantiatePayload.Payload), nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dont need this cast |
||
} | ||
|
||
// inc increments the actor's in-memory global counter. | ||
|
@@ -144,11 +150,29 @@ func invokeCustomHostFn(payload []byte) ([]byte, error) { | |
return wapc.HostCall("wapc", "nola", string(payload), payload) | ||
} | ||
|
||
var startupWasCalled = false | ||
|
||
func startup(payload []byte) ([]byte, error) { | ||
startupWasCalled = true | ||
instantiatePayload = append([]byte(nil), payload...) | ||
err := jsonparser.ObjectEach(payload, func(key, value []byte, dataType jsonparser.ValueType, offset int) error { | ||
switch string(key) { | ||
case "IsWorker": | ||
isWorker, err := strconv.ParseBool(string(value)) | ||
if err != nil { | ||
return fmt.Errorf("failed to parse IsWorker bool: %w", err) | ||
} | ||
instantiatePayload.IsWorker = isWorker | ||
case "Payload": | ||
payload, err := base64.StdEncoding.DecodeString(string(value)) | ||
if err != nil { | ||
return fmt.Errorf("failed to parse Payload: %w", err) | ||
} | ||
instantiatePayload.Payload = payload | ||
} | ||
return nil | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to parse InstantiatePayload: %w", err) | ||
} | ||
|
||
return nil, nil | ||
} | ||
|
||
|
@@ -160,11 +184,18 @@ func getStartupWasCalled(payload []byte) ([]byte, error) { | |
} | ||
|
||
func shutdown(payload []byte) ([]byte, error) { | ||
if instantiatePayload.IsWorker { | ||
shutdownWasCalled = true | ||
return nil, nil | ||
} | ||
_, err := wapc.HostCall("wapc", "nola", wapcutils.KVPutOperationName, wapcutils.EncodePutPayload(nil, []byte("shutdown"), []byte("true"))) | ||
return nil, err | ||
} | ||
|
||
func getShutdownValue(payload []byte) ([]byte, error) { | ||
if instantiatePayload.IsWorker { | ||
return []byte(strconv.FormatBool(shutdownWasCalled)), nil | ||
} | ||
res, err := wapc.HostCall("wapc", "nola", wapcutils.KVGetOperationName, []byte("shutdown")) | ||
if err != nil { | ||
return nil, err | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package virtual | |
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
|
@@ -530,7 +531,13 @@ func (r *environment) InvokeActorDirectStream( | |
heartbeatResult.ServerVersion, serverVersion) | ||
} | ||
|
||
return r.activations.invoke(ctx, reference, operation, create.InstantiatePayload, payload, false) | ||
// Wrap instantiation payload into a struct that provides metadata to the actor | ||
b, err := json.Marshal(types.InstantiatePayload{Payload: create.InstantiatePayload, IsWorker: false}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should push these down into |
||
if err != nil { | ||
return nil, fmt.Errorf("failed to marshal instantiation payload: %w", err) | ||
} | ||
|
||
return r.activations.invoke(ctx, reference, operation, b, payload, false) | ||
} | ||
|
||
func (r *environment) InvokeWorker( | ||
|
@@ -582,9 +589,15 @@ func (r *environment) InvokeWorkerStream( | |
return nil, fmt.Errorf("InvokeWorker: error creating actor reference: %w", err) | ||
} | ||
|
||
// Wrap instantiation payload into a struct that provides metadata to the actor | ||
b, err := json.Marshal(types.InstantiatePayload{Payload: create.InstantiatePayload, IsWorker: true}) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to marshal instantiation payload: %w", err) | ||
} | ||
|
||
// Workers provide none of the consistency / linearizability guarantees that actor's do, so we | ||
// can bypass the registry entirely and just immediately invoke the function. | ||
return r.activations.invoke(ctx, ref, operation, create.InstantiatePayload, payload, false) | ||
return r.activations.invoke(ctx, ref, operation, b, payload, false) | ||
} | ||
|
||
func (r *environment) Close(ctx context.Context) error { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -168,7 +168,7 @@ func TestCreateIfNotExistWithInstantiatePayload(t *testing.T) { | |
ctx, ns, "a", "test-module", | ||
"getInstantiatePayload", nil, types.CreateIfNotExist{}) | ||
require.NoError(t, err) | ||
require.Equal(t, []byte("abc"), result) | ||
require.Equal(t, "abc", string(result)) | ||
} | ||
} | ||
} | ||
|
@@ -978,9 +978,13 @@ func (tm testModule) Instantiate( | |
payload []byte, | ||
host HostCapabilities, | ||
) (Actor, error) { | ||
p := types.InstantiatePayload{} | ||
if err := json.Unmarshal(payload, &p); err != nil { | ||
return nil, fmt.Errorf("failed to unmarshal InstantiatePayload: %w", err) | ||
} | ||
return &testActor{ | ||
host: host, | ||
instantiatePayload: payload, | ||
instantiatePayload: p, | ||
}, nil | ||
} | ||
|
||
|
@@ -994,7 +998,7 @@ type testActor struct { | |
count int | ||
startupWasCalled bool | ||
shutdownWasCalled bool | ||
instantiatePayload []byte | ||
instantiatePayload types.InstantiatePayload | ||
} | ||
|
||
func (ta *testActor) Invoke( | ||
|
@@ -1009,18 +1013,18 @@ func (ta *testActor) Invoke( | |
return nil, nil | ||
case wapcutils.ShutdownOperationName: | ||
ta.shutdownWasCalled = true | ||
if _, ok := transaction.(noopTransaction); !ok { | ||
if !ta.instantiatePayload.IsWorker { | ||
return nil, transaction.Put(ctx, []byte("shutdown"), []byte("true")) | ||
} | ||
return nil, nil | ||
case "getShutdownValue": | ||
if _, ok := transaction.(noopTransaction); !ok { | ||
if !ta.instantiatePayload.IsWorker { | ||
result, _, err := transaction.Get(ctx, []byte("shutdown")) | ||
return result, err | ||
} | ||
return []byte(strconv.FormatBool(ta.shutdownWasCalled)), nil | ||
case "getInstantiatePayload": | ||
return ta.instantiatePayload, nil | ||
return []byte(ta.instantiatePayload.Payload), nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no byte cast here needed |
||
case "inc": | ||
ta.count++ | ||
return []byte(strconv.Itoa(ta.count)), nil | ||
|
@@ -1084,10 +1088,15 @@ func (tm testStreamModule) Instantiate( | |
streamInterfaceWasCalledMutex.Lock() | ||
defer streamInterfaceWasCalledMutex.Unlock() | ||
streamInterfaceWasCalled = true | ||
|
||
p := types.InstantiatePayload{} | ||
if err := json.Unmarshal(payload, &p); err != nil { | ||
return nil, fmt.Errorf("failed to unmarshal InstantiatePayload: %w", err) | ||
} | ||
return &testStreamActor{ | ||
a: &testActor{ | ||
host: host, | ||
instantiatePayload: payload, | ||
instantiatePayload: p, | ||
}, | ||
}, nil | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,17 @@ type CreateIfNotExist struct { | |
InstantiatePayload []byte | ||
} | ||
|
||
// InstantiatePayload provides the arguments for initialiazing actors on the STARTUP call. | ||
type InstantiatePayload struct { | ||
// IsWorker is a flag that is used to indicate whether the payload is intended for a worker or not | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. super nit: |
||
IsWorker bool | ||
// InstantiatePayload is the []byte that will be provided to the actor on | ||
// instantiation. It is generally used to provide any actor-specific constructor | ||
// arguments that are required to instantiate the actor in memory. | ||
// It is the value passed at CreateIfNotExist.InstantiatePayload | ||
Payload []byte | ||
} | ||
|
||
// ActorOptions contains the options for a given actor. | ||
type ActorOptions struct { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont need this cast