Skip to content

Commit

Permalink
fix(design): fix bad design of remoting call (#539)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Nov 28, 2024
1 parent 4fda560 commit a4e7ad8
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 78 deletions.
9 changes: 6 additions & 3 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ func TestActorSystem(t *testing.T) {
require.True(t, proto.Equal(remoteAddr, addr))

remoting := NewRemoting()
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)
from := address.NoSender()
reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), 20*time.Second)
require.NoError(t, err)
require.NotNil(t, reply)

Expand Down Expand Up @@ -624,7 +625,8 @@ func TestActorSystem(t *testing.T) {
Id: "",
},
)
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)
from := address.NoSender()
reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), 20*time.Second)
require.Error(t, err)
require.Nil(t, reply)

Expand Down Expand Up @@ -1454,7 +1456,8 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, addr)

// send the message to exchanger actor one using remote messaging
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)
from := address.NoSender()
reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), 20*time.Second)

require.NoError(t, err)
require.NotNil(t, reply)
Expand Down
2 changes: 1 addition & 1 deletion actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,6 @@ func toReceiveContext(ctx context.Context, to *PID, message proto.Message, async
default:
receiveContext := contextFromPool()
receiveContext.build(ctx, NoSender, to, message, async)
return receiveContext.withRemoteSender(address.From(address.NoSender)), nil
return receiveContext.withRemoteSender(address.NoSender()), nil
}
}
8 changes: 8 additions & 0 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@ func (pid *PID) Name() string {

// Equals is a convenient method to compare two PIDs
func (pid *PID) Equals(to *PID) bool {
if pid == nil && to == nil {
return true
}

if pid == nil || to == nil {
return false
}

return strings.EqualFold(pid.ID(), to.ID())
}

Expand Down
41 changes: 24 additions & 17 deletions actors/pid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2015,28 +2015,35 @@ func TestID(t *testing.T) {
assert.NoError(t, err)
}
func TestEquals(t *testing.T) {
ctx := context.TODO()
logger := log.DiscardLogger
sys, err := NewActorSystem("test",
WithLogger(logger),
WithPassivationDisabled())
t.Run("case 1", func(t *testing.T) {
ctx := context.TODO()
logger := log.DiscardLogger
sys, err := NewActorSystem("test",
WithLogger(logger),
WithPassivationDisabled())

require.NoError(t, err)
err = sys.Start(ctx)
assert.NoError(t, err)
require.NoError(t, err)
err = sys.Start(ctx)
assert.NoError(t, err)

pid1, err := sys.Spawn(ctx, "test", newActor())
require.NoError(t, err)
assert.NotNil(t, pid1)
pid1, err := sys.Spawn(ctx, "test", newActor())
require.NoError(t, err)
assert.NotNil(t, pid1)

pid2, err := sys.Spawn(ctx, "exchange", &exchanger{})
require.NoError(t, err)
assert.NotNil(t, pid2)
pid2, err := sys.Spawn(ctx, "exchange", &exchanger{})
require.NoError(t, err)
assert.NotNil(t, pid2)

assert.False(t, pid1.Equals(pid2))
assert.False(t, pid1.Equals(NoSender))
assert.False(t, pid2.Equals(NoSender))
assert.False(t, pid1.Equals(pid2))

err = sys.Stop(ctx)
assert.NoError(t, err)
err = sys.Stop(ctx)
assert.NoError(t, err)
})
t.Run("case 2", func(t *testing.T) {
assert.True(t, NoSender.Equals(NoSender))
})
}
func TestRemoteSpawn(t *testing.T) {
t.Run("When remoting is enabled", func(t *testing.T) {
Expand Down
22 changes: 18 additions & 4 deletions actors/receive_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,24 @@ func (rctx *ReceiveContext) ForwardTo(actorName string) {
// This method can only be used when remoting is enabled on the running actor system
func (rctx *ReceiveContext) RemoteForward(to *address.Address) {
sender := rctx.Sender()
message := rctx.Message()
ctx := context.WithoutCancel(rctx.ctx)
if err := sender.RemoteTell(ctx, to, message); err != nil {
rctx.Err(err)
remoteSender := rctx.RemoteSender()
remoting := rctx.Self().remoting

if !sender.Equals(NoSender) {
message := rctx.Message()
ctx := context.WithoutCancel(rctx.ctx)
if err := sender.RemoteTell(ctx, to, message); err != nil {
rctx.Err(err)
}
return
}

if !remoteSender.Equals(address.NoSender()) && remoting != nil {
message := rctx.Message()
ctx := context.WithoutCancel(rctx.ctx)
if err := remoting.RemoteTell(ctx, remoteSender, to, message); err != nil {
rctx.Err(err)
}
}
}

Expand Down
54 changes: 54 additions & 0 deletions actors/receive_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2547,6 +2547,60 @@ func TestReceiveContext(t *testing.T) {
require.NoError(t, pidA.Shutdown(ctx))
require.NoError(t, pidB.Shutdown(ctx))
})
t.Run("With successful RemoteForward case 2", func(t *testing.T) {
ctx := context.TODO()
ports := dynaport.Get(2)
host := "127.0.0.1"

actorSystem, err := NewActorSystem("testSys",
WithRemoting(host, int32(ports[0])),
WithLogger(log.DiscardLogger))
require.NoError(t, err)
require.NotNil(t, actorSystem)

require.NoError(t, actorSystem.Start(ctx))

actorSystem2, err := NewActorSystem("testSys", WithRemoting(host, int32(ports[1])),
WithLogger(log.DiscardLogger))

require.NoError(t, err)
require.NotNil(t, actorSystem2)

require.NoError(t, actorSystem2.Start(ctx))

// create actorA
pidA, err := actorSystem2.Spawn(ctx, "ExchangeA", &exchanger{})
require.NoError(t, err)
require.NotNil(t, pidA)

pidC, err := actorSystem2.Spawn(ctx, "ExchangeC", &remoteQA{})
require.NoError(t, err)
require.NotNil(t, pidC)

// create actorB
actorB := &forwardQA{
remoteRef: pidC,
}

pidB, err := actorSystem2.Spawn(ctx, "ExchangeB", actorB)
require.NoError(t, err)
require.NotNil(t, pidB)

// actor A is killing actor C using a forward pattern
// actorA tell actorB forward actorC
err = pidA.RemoteTell(ctx, pidB.Address(), new(testpb.RemoteForward))
require.NoError(t, err)

// wait for the async call to properly complete
lib.Pause(time.Second)
require.True(t, pidA.IsRunning())
require.True(t, pidB.IsRunning())
require.False(t, pidC.IsRunning())

// let us shutdown the rest
require.NoError(t, pidA.Shutdown(ctx))
require.NoError(t, pidB.Shutdown(ctx))
})
t.Run("With successful ForwardTo", func(t *testing.T) {
// create a context
ctx := context.TODO()
Expand Down
16 changes: 8 additions & 8 deletions actors/remoting.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewRemoting() *Remoting {
}

// RemoteTell sends a message to an actor remotely without expecting any reply
func (r *Remoting) RemoteTell(ctx context.Context, to *address.Address, message proto.Message) error {
func (r *Remoting) RemoteTell(ctx context.Context, from, to *address.Address, message proto.Message) error {
marshaled, err := anypb.New(message)
if err != nil {
return ErrInvalidMessage(err)
Expand All @@ -71,7 +71,7 @@ func (r *Remoting) RemoteTell(ctx context.Context, to *address.Address, message
remoteClient := r.Client(to.GetHost(), int(to.GetPort()))
request := &internalpb.RemoteTellRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Sender: from.Address,
Receiver: to.Address,
Message: marshaled,
},
Expand All @@ -97,7 +97,7 @@ func (r *Remoting) RemoteTell(ctx context.Context, to *address.Address, message
}

// RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func (r *Remoting) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) {
func (r *Remoting) RemoteAsk(ctx context.Context, from, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) {
marshaled, err := anypb.New(message)
if err != nil {
return nil, ErrInvalidMessage(err)
Expand All @@ -106,7 +106,7 @@ func (r *Remoting) RemoteAsk(ctx context.Context, to *address.Address, message p
remoteClient := r.Client(to.GetHost(), int(to.GetPort()))
request := &internalpb.RemoteAskRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Sender: from.Address,
Receiver: to.Address,
Message: marshaled,
},
Expand Down Expand Up @@ -173,7 +173,7 @@ func (r *Remoting) RemoteLookup(ctx context.Context, host string, port int, name
}

// RemoteBatchTell sends bulk asynchronous messages to an actor
func (r *Remoting) RemoteBatchTell(ctx context.Context, to *address.Address, messages []proto.Message) error {
func (r *Remoting) RemoteBatchTell(ctx context.Context, from, to *address.Address, messages []proto.Message) error {
var requests []*internalpb.RemoteTellRequest
for _, message := range messages {
packed, err := anypb.New(message)
Expand All @@ -184,7 +184,7 @@ func (r *Remoting) RemoteBatchTell(ctx context.Context, to *address.Address, mes
requests = append(
requests, &internalpb.RemoteTellRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Sender: from.Address,
Receiver: to.Address,
Message: packed,
},
Expand Down Expand Up @@ -217,7 +217,7 @@ func (r *Remoting) RemoteBatchTell(ctx context.Context, to *address.Address, mes
}

// RemoteBatchAsk sends bulk messages to an actor with responses expected
func (r *Remoting) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error) {
func (r *Remoting) RemoteBatchAsk(ctx context.Context, from, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error) {
var requests []*internalpb.RemoteAskRequest
for _, message := range messages {
packed, err := anypb.New(message)
Expand All @@ -228,7 +228,7 @@ func (r *Remoting) RemoteBatchAsk(ctx context.Context, to *address.Address, mess
requests = append(
requests, &internalpb.RemoteAskRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Sender: from.Address,
Receiver: to.Address,
Message: packed,
},
Expand Down
Loading

0 comments on commit a4e7ad8

Please sign in to comment.