Skip to content

Commit

Permalink
fix(design): fix bad design of remoting call
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Nov 28, 2024
1 parent 4fda560 commit 2e01a5e
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 55 deletions.
9 changes: 6 additions & 3 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ func TestActorSystem(t *testing.T) {
require.True(t, proto.Equal(remoteAddr, addr))

remoting := NewRemoting()
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)
from := address.NoSender()
reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), 20*time.Second)
require.NoError(t, err)
require.NotNil(t, reply)

Expand Down Expand Up @@ -624,7 +625,8 @@ func TestActorSystem(t *testing.T) {
Id: "",
},
)
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)
from := address.NoSender()
reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), 20*time.Second)
require.Error(t, err)
require.Nil(t, reply)

Expand Down Expand Up @@ -1454,7 +1456,8 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, addr)

// send the message to exchanger actor one using remote messaging
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)
from := address.NoSender()
reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), 20*time.Second)

require.NoError(t, err)
require.NotNil(t, reply)
Expand Down
2 changes: 1 addition & 1 deletion actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,6 @@ func toReceiveContext(ctx context.Context, to *PID, message proto.Message, async
default:
receiveContext := contextFromPool()
receiveContext.build(ctx, NoSender, to, message, async)
return receiveContext.withRemoteSender(address.From(address.NoSender)), nil
return receiveContext.withRemoteSender(address.NoSender()), nil
}
}
3 changes: 3 additions & 0 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ func (pid *PID) Name() string {

// Equals is a convenient method to compare two PIDs
func (pid *PID) Equals(to *PID) bool {
if to == nil {
return false
}
return strings.EqualFold(pid.ID(), to.ID())
}

Expand Down
22 changes: 18 additions & 4 deletions actors/receive_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,24 @@ func (rctx *ReceiveContext) ForwardTo(actorName string) {
// This method can only be used when remoting is enabled on the running actor system
func (rctx *ReceiveContext) RemoteForward(to *address.Address) {
sender := rctx.Sender()
message := rctx.Message()
ctx := context.WithoutCancel(rctx.ctx)
if err := sender.RemoteTell(ctx, to, message); err != nil {
rctx.Err(err)
remoteSender := rctx.RemoteSender()
remoting := rctx.Self().remoting

if !sender.Equals(NoSender) {
message := rctx.Message()
ctx := context.WithoutCancel(rctx.ctx)
if err := sender.RemoteTell(ctx, to, message); err != nil {
rctx.Err(err)
}
return
}

if !remoteSender.Equals(address.NoSender()) && remoting != nil {
ctx := context.WithoutCancel(rctx.ctx)
message, _ := anypb.New(rctx.Message())
if err := remoting.RemoteTell(ctx, remoteSender, to, message); err != nil {
rctx.Err(err)
}
}
}

Expand Down
16 changes: 8 additions & 8 deletions actors/remoting.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewRemoting() *Remoting {
}

// RemoteTell sends a message to an actor remotely without expecting any reply
func (r *Remoting) RemoteTell(ctx context.Context, to *address.Address, message proto.Message) error {
func (r *Remoting) RemoteTell(ctx context.Context, from, to *address.Address, message proto.Message) error {
marshaled, err := anypb.New(message)
if err != nil {
return ErrInvalidMessage(err)
Expand All @@ -71,7 +71,7 @@ func (r *Remoting) RemoteTell(ctx context.Context, to *address.Address, message
remoteClient := r.Client(to.GetHost(), int(to.GetPort()))
request := &internalpb.RemoteTellRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Sender: from.Address,
Receiver: to.Address,
Message: marshaled,
},
Expand All @@ -97,7 +97,7 @@ func (r *Remoting) RemoteTell(ctx context.Context, to *address.Address, message
}

// RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func (r *Remoting) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) {
func (r *Remoting) RemoteAsk(ctx context.Context, from, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) {
marshaled, err := anypb.New(message)
if err != nil {
return nil, ErrInvalidMessage(err)
Expand All @@ -106,7 +106,7 @@ func (r *Remoting) RemoteAsk(ctx context.Context, to *address.Address, message p
remoteClient := r.Client(to.GetHost(), int(to.GetPort()))
request := &internalpb.RemoteAskRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Sender: from.Address,
Receiver: to.Address,
Message: marshaled,
},
Expand Down Expand Up @@ -173,7 +173,7 @@ func (r *Remoting) RemoteLookup(ctx context.Context, host string, port int, name
}

// RemoteBatchTell sends bulk asynchronous messages to an actor
func (r *Remoting) RemoteBatchTell(ctx context.Context, to *address.Address, messages []proto.Message) error {
func (r *Remoting) RemoteBatchTell(ctx context.Context, from, to *address.Address, messages []proto.Message) error {
var requests []*internalpb.RemoteTellRequest
for _, message := range messages {
packed, err := anypb.New(message)
Expand All @@ -184,7 +184,7 @@ func (r *Remoting) RemoteBatchTell(ctx context.Context, to *address.Address, mes
requests = append(
requests, &internalpb.RemoteTellRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Sender: from.Address,
Receiver: to.Address,
Message: packed,
},
Expand Down Expand Up @@ -217,7 +217,7 @@ func (r *Remoting) RemoteBatchTell(ctx context.Context, to *address.Address, mes
}

// RemoteBatchAsk sends bulk messages to an actor with responses expected
func (r *Remoting) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error) {
func (r *Remoting) RemoteBatchAsk(ctx context.Context, from, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error) {
var requests []*internalpb.RemoteAskRequest
for _, message := range messages {
packed, err := anypb.New(message)
Expand All @@ -228,7 +228,7 @@ func (r *Remoting) RemoteBatchAsk(ctx context.Context, to *address.Address, mess
requests = append(
requests, &internalpb.RemoteAskRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Sender: from.Address,
Receiver: to.Address,
Message: packed,
},
Expand Down
59 changes: 38 additions & 21 deletions actors/remoting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func TestRemoteTell(t *testing.T) {
require.NoError(t, err)
// create a message to send to the test actor
message := new(testpb.TestSend)
from := address.NoSender()
// send the message to the actor
for i := 0; i < 10; i++ {
err = remoting.RemoteTell(ctx, addr, message)
err = remoting.RemoteTell(ctx, from, addr, message)
// perform some assertions
require.NoError(t, err)
}
Expand Down Expand Up @@ -133,7 +134,8 @@ func TestRemoteTell(t *testing.T) {
// get the address of the actor
addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName)
require.NoError(t, err)
err = remoting.RemoteTell(ctx, addr, nil)
from := address.NoSender()
err = remoting.RemoteTell(ctx, from, addr, nil)
// perform some assertions
require.Error(t, err)

Expand Down Expand Up @@ -189,7 +191,8 @@ func TestRemoteTell(t *testing.T) {
// create a message to send to the test actor
message := new(testpb.TestSend)
// send the message to the actor
err = remoting.RemoteTell(ctx, address.From(addr), message)
from := address.NoSender()
err = remoting.RemoteTell(ctx, from, address.From(addr), message)
// perform some assertions
require.Error(t, err)

Expand Down Expand Up @@ -244,7 +247,8 @@ func TestRemoteTell(t *testing.T) {
// create a message to send to the test actor
message := new(testpb.TestSend)
// send the message to the actor
err = remoting.RemoteTell(ctx, addr, message)
from := address.NoSender()
err = remoting.RemoteTell(ctx, from, addr, message)
// perform some assertions
require.Error(t, err)
require.EqualError(t, err, "failed_precondition: remoting is not enabled")
Expand Down Expand Up @@ -301,7 +305,8 @@ func TestRemoteTell(t *testing.T) {
messages[i] = new(testpb.TestSend)
}

err = remoting.RemoteBatchTell(ctx, addr, messages)
from := address.NoSender()
err = remoting.RemoteBatchTell(ctx, from, addr, messages)
require.NoError(t, err)

// wait for processing to complete on the actor side
Expand Down Expand Up @@ -357,10 +362,11 @@ func TestRemoteTell(t *testing.T) {
}

remoting := NewRemoting()
from := address.NoSender()
// create a message to send to the test actor
message := new(testpb.TestSend)
// send the message to the actor
err = remoting.RemoteBatchTell(ctx, address.From(addr), []proto.Message{message})
err = remoting.RemoteBatchTell(ctx, from, address.From(addr), []proto.Message{message})
// perform some assertions
require.Error(t, err)

Expand Down Expand Up @@ -407,15 +413,15 @@ func TestRemoteTell(t *testing.T) {
remoting := NewRemoting()
addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName)
require.NoError(t, err)

from := address.NoSender()
// let us disable remoting
actorsSystem := sys.(*actorSystem)
actorsSystem.remotingEnabled.Store(false)

// create a message to send to the test actor
message := new(testpb.TestSend)
// send the message to the actor
err = remoting.RemoteBatchTell(ctx, addr, []proto.Message{message})
err = remoting.RemoteBatchTell(ctx, from, addr, []proto.Message{message})
// perform some assertions
require.Error(t, err)
require.EqualError(t, err, "failed_precondition: remoting is not enabled")
Expand Down Expand Up @@ -469,10 +475,11 @@ func TestRemoteTell(t *testing.T) {
require.NoError(t, actorRef.Shutdown(ctx))
lib.Pause(time.Second)

from := address.NoSender()
// create a message to send to the test actor
message := new(testpb.TestSend)
// send the message to the actor
err = remoting.RemoteTell(ctx, addr, message)
err = remoting.RemoteTell(ctx, from, addr, message)
// perform some assertions
require.Error(t, err)
require.Contains(t, err.Error(), "not found")
Expand Down Expand Up @@ -526,10 +533,11 @@ func TestRemoteTell(t *testing.T) {
require.NoError(t, actorRef.Shutdown(ctx))
lib.Pause(time.Second)

from := address.NoSender()
// create a message to send to the test actor
message := new(testpb.TestSend)
// send the message to the actor
err = remoting.RemoteBatchTell(ctx, addr, []proto.Message{message})
err = remoting.RemoteBatchTell(ctx, from, addr, []proto.Message{message})
// perform some assertions
require.Error(t, err)
require.Contains(t, err.Error(), "not found")
Expand Down Expand Up @@ -578,14 +586,15 @@ func TestRemoteAsk(t *testing.T) {
assert.NotNil(t, actorRef)

remoting := NewRemoting()
from := address.NoSender()
// get the address of the actor
addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName)
require.NoError(t, err)

// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := remoting.RemoteAsk(ctx, addr, message, time.Minute)
reply, err := remoting.RemoteAsk(ctx, from, addr, message, time.Minute)
// perform some assertions
require.NoError(t, err)
require.NotNil(t, reply)
Expand Down Expand Up @@ -645,8 +654,9 @@ func TestRemoteAsk(t *testing.T) {
addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName)
require.NoError(t, err)

from := address.NoSender()
// send the message to the actor
reply, err := remoting.RemoteAsk(ctx, addr, nil, time.Minute)
reply, err := remoting.RemoteAsk(ctx, from, addr, nil, time.Minute)
// perform some assertions
require.Error(t, err)
require.Nil(t, reply)
Expand Down Expand Up @@ -701,10 +711,11 @@ func TestRemoteAsk(t *testing.T) {
}

remoting := NewRemoting()
from := address.NoSender()
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := remoting.RemoteAsk(ctx, address.From(addr), message, time.Minute)
reply, err := remoting.RemoteAsk(ctx, from, address.From(addr), message, time.Minute)
// perform some assertions
require.Error(t, err)
require.Nil(t, reply)
Expand Down Expand Up @@ -762,10 +773,11 @@ func TestRemoteAsk(t *testing.T) {
actorsSystem := sys.(*actorSystem)
actorsSystem.remotingEnabled.Store(false)

from := address.NoSender()
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := remoting.RemoteAsk(ctx, addr, message, time.Minute)
reply, err := remoting.RemoteAsk(ctx, from, addr, message, time.Minute)
// perform some assertions
require.Error(t, err)
require.EqualError(t, err, "failed_precondition: remoting is not enabled")
Expand Down Expand Up @@ -817,10 +829,11 @@ func TestRemoteAsk(t *testing.T) {
addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName)
require.NoError(t, err)

from := address.NoSender()
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
replies, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}, time.Minute)
replies, err := remoting.RemoteBatchAsk(ctx, from, addr, []proto.Message{message}, time.Minute)
// perform some assertions
require.NoError(t, err)
require.Len(t, replies, 1)
Expand Down Expand Up @@ -885,10 +898,11 @@ func TestRemoteAsk(t *testing.T) {
}

remoting := NewRemoting()
from := address.NoSender()
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := remoting.RemoteBatchAsk(ctx, address.From(addr), []proto.Message{message}, time.Minute)
reply, err := remoting.RemoteBatchAsk(ctx, from, address.From(addr), []proto.Message{message}, time.Minute)
// perform some assertions
require.Error(t, err)
require.Nil(t, reply)
Expand Down Expand Up @@ -946,10 +960,11 @@ func TestRemoteAsk(t *testing.T) {
actorsSystem := sys.(*actorSystem)
actorsSystem.remotingEnabled.Store(false)

from := address.NoSender()
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}, time.Minute)
reply, err := remoting.RemoteBatchAsk(ctx, from, addr, []proto.Message{message}, time.Minute)
// perform some assertions
require.Error(t, err)
require.EqualError(t, err, "failed_precondition: remoting is not enabled")
Expand Down Expand Up @@ -1001,7 +1016,7 @@ func TestRemoteAsk(t *testing.T) {
assert.NotNil(t, actorRef)

remoting := NewRemoting()

from := address.NoSender()
// get the address of the actor
addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName)
require.NoError(t, err)
Expand All @@ -1013,7 +1028,7 @@ func TestRemoteAsk(t *testing.T) {
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := remoting.RemoteAsk(ctx, addr, message, time.Minute)
reply, err := remoting.RemoteAsk(ctx, from, addr, message, time.Minute)
// perform some assertions
require.Error(t, err)
require.Contains(t, err.Error(), "not found")
Expand Down Expand Up @@ -1071,10 +1086,11 @@ func TestRemoteAsk(t *testing.T) {
require.NoError(t, actorRef.Shutdown(ctx))
lib.Pause(time.Second)

from := address.NoSender()
// create a message to send to the test actor
message := new(testpb.TestReply)
// send the message to the actor
reply, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}, time.Minute)
reply, err := remoting.RemoteBatchAsk(ctx, from, addr, []proto.Message{message}, time.Minute)
// perform some assertions
require.Error(t, err)
require.Contains(t, err.Error(), "not found")
Expand Down Expand Up @@ -1392,8 +1408,9 @@ func TestAPIRemoteSpawn(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, addr)

from := address.NoSender()
// send the message to exchanger actor one using remote messaging
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), time.Minute)
reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), time.Minute)

require.NoError(t, err)
require.NotNil(t, reply)
Expand Down
Loading

0 comments on commit 2e01a5e

Please sign in to comment.