From 2e01a5e54e90ce54768de2bda633ac2a30efe4a5 Mon Sep 17 00:00:00 2001 From: Tochemey Date: Thu, 28 Nov 2024 19:41:47 +0000 Subject: [PATCH] fix(design): fix bad design of remoting call --- actors/actor_system_test.go | 9 ++++-- actors/api.go | 2 +- actors/pid.go | 3 ++ actors/receive_context.go | 22 +++++++++++--- actors/remoting.go | 16 +++++----- actors/remoting_test.go | 59 ++++++++++++++++++++++++------------- actors/scheduler.go | 21 +++++++------ address/address.go | 14 ++++----- address/address_test.go | 4 +-- 9 files changed, 95 insertions(+), 55 deletions(-) diff --git a/actors/actor_system_test.go b/actors/actor_system_test.go index f1cdae45..d34023a2 100644 --- a/actors/actor_system_test.go +++ b/actors/actor_system_test.go @@ -220,7 +220,8 @@ func TestActorSystem(t *testing.T) { require.True(t, proto.Equal(remoteAddr, addr)) remoting := NewRemoting() - reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second) + from := address.NoSender() + reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), 20*time.Second) require.NoError(t, err) require.NotNil(t, reply) @@ -624,7 +625,8 @@ func TestActorSystem(t *testing.T) { Id: "", }, ) - reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second) + from := address.NoSender() + reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), 20*time.Second) require.Error(t, err) require.Nil(t, reply) @@ -1454,7 +1456,8 @@ func TestActorSystem(t *testing.T) { require.NotNil(t, addr) // send the message to exchanger actor one using remote messaging - reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second) + from := address.NoSender() + reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), 20*time.Second) require.NoError(t, err) require.NotNil(t, reply) diff --git a/actors/api.go b/actors/api.go index a12a5569..f164a74b 100644 --- a/actors/api.go +++ b/actors/api.go @@ -117,6 +117,6 @@ func toReceiveContext(ctx context.Context, to *PID, message proto.Message, async default: receiveContext := contextFromPool() receiveContext.build(ctx, NoSender, to, message, async) - return receiveContext.withRemoteSender(address.From(address.NoSender)), nil + return receiveContext.withRemoteSender(address.NoSender()), nil } } diff --git a/actors/pid.go b/actors/pid.go index cf9d8b4e..2a5b8bae 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -243,6 +243,9 @@ func (pid *PID) Name() string { // Equals is a convenient method to compare two PIDs func (pid *PID) Equals(to *PID) bool { + if to == nil { + return false + } return strings.EqualFold(pid.ID(), to.ID()) } diff --git a/actors/receive_context.go b/actors/receive_context.go index 9cefeb7b..3b4443e8 100644 --- a/actors/receive_context.go +++ b/actors/receive_context.go @@ -365,10 +365,24 @@ func (rctx *ReceiveContext) ForwardTo(actorName string) { // This method can only be used when remoting is enabled on the running actor system func (rctx *ReceiveContext) RemoteForward(to *address.Address) { sender := rctx.Sender() - message := rctx.Message() - ctx := context.WithoutCancel(rctx.ctx) - if err := sender.RemoteTell(ctx, to, message); err != nil { - rctx.Err(err) + remoteSender := rctx.RemoteSender() + remoting := rctx.Self().remoting + + if !sender.Equals(NoSender) { + message := rctx.Message() + ctx := context.WithoutCancel(rctx.ctx) + if err := sender.RemoteTell(ctx, to, message); err != nil { + rctx.Err(err) + } + return + } + + if !remoteSender.Equals(address.NoSender()) && remoting != nil { + ctx := context.WithoutCancel(rctx.ctx) + message, _ := anypb.New(rctx.Message()) + if err := remoting.RemoteTell(ctx, remoteSender, to, message); err != nil { + rctx.Err(err) + } } } diff --git a/actors/remoting.go b/actors/remoting.go index 980915f6..6ab8f376 100644 --- a/actors/remoting.go +++ b/actors/remoting.go @@ -62,7 +62,7 @@ func NewRemoting() *Remoting { } // RemoteTell sends a message to an actor remotely without expecting any reply -func (r *Remoting) RemoteTell(ctx context.Context, to *address.Address, message proto.Message) error { +func (r *Remoting) RemoteTell(ctx context.Context, from, to *address.Address, message proto.Message) error { marshaled, err := anypb.New(message) if err != nil { return ErrInvalidMessage(err) @@ -71,7 +71,7 @@ func (r *Remoting) RemoteTell(ctx context.Context, to *address.Address, message remoteClient := r.Client(to.GetHost(), int(to.GetPort())) request := &internalpb.RemoteTellRequest{ RemoteMessage: &internalpb.RemoteMessage{ - Sender: address.NoSender, + Sender: from.Address, Receiver: to.Address, Message: marshaled, }, @@ -97,7 +97,7 @@ func (r *Remoting) RemoteTell(ctx context.Context, to *address.Address, message } // RemoteAsk sends a synchronous message to another actor remotely and expect a response. -func (r *Remoting) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) { +func (r *Remoting) RemoteAsk(ctx context.Context, from, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) { marshaled, err := anypb.New(message) if err != nil { return nil, ErrInvalidMessage(err) @@ -106,7 +106,7 @@ func (r *Remoting) RemoteAsk(ctx context.Context, to *address.Address, message p remoteClient := r.Client(to.GetHost(), int(to.GetPort())) request := &internalpb.RemoteAskRequest{ RemoteMessage: &internalpb.RemoteMessage{ - Sender: address.NoSender, + Sender: from.Address, Receiver: to.Address, Message: marshaled, }, @@ -173,7 +173,7 @@ func (r *Remoting) RemoteLookup(ctx context.Context, host string, port int, name } // RemoteBatchTell sends bulk asynchronous messages to an actor -func (r *Remoting) RemoteBatchTell(ctx context.Context, to *address.Address, messages []proto.Message) error { +func (r *Remoting) RemoteBatchTell(ctx context.Context, from, to *address.Address, messages []proto.Message) error { var requests []*internalpb.RemoteTellRequest for _, message := range messages { packed, err := anypb.New(message) @@ -184,7 +184,7 @@ func (r *Remoting) RemoteBatchTell(ctx context.Context, to *address.Address, mes requests = append( requests, &internalpb.RemoteTellRequest{ RemoteMessage: &internalpb.RemoteMessage{ - Sender: address.NoSender, + Sender: from.Address, Receiver: to.Address, Message: packed, }, @@ -217,7 +217,7 @@ func (r *Remoting) RemoteBatchTell(ctx context.Context, to *address.Address, mes } // RemoteBatchAsk sends bulk messages to an actor with responses expected -func (r *Remoting) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error) { +func (r *Remoting) RemoteBatchAsk(ctx context.Context, from, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error) { var requests []*internalpb.RemoteAskRequest for _, message := range messages { packed, err := anypb.New(message) @@ -228,7 +228,7 @@ func (r *Remoting) RemoteBatchAsk(ctx context.Context, to *address.Address, mess requests = append( requests, &internalpb.RemoteAskRequest{ RemoteMessage: &internalpb.RemoteMessage{ - Sender: address.NoSender, + Sender: from.Address, Receiver: to.Address, Message: packed, }, diff --git a/actors/remoting_test.go b/actors/remoting_test.go index 1860cb02..8b446632 100644 --- a/actors/remoting_test.go +++ b/actors/remoting_test.go @@ -82,9 +82,10 @@ func TestRemoteTell(t *testing.T) { require.NoError(t, err) // create a message to send to the test actor message := new(testpb.TestSend) + from := address.NoSender() // send the message to the actor for i := 0; i < 10; i++ { - err = remoting.RemoteTell(ctx, addr, message) + err = remoting.RemoteTell(ctx, from, addr, message) // perform some assertions require.NoError(t, err) } @@ -133,7 +134,8 @@ func TestRemoteTell(t *testing.T) { // get the address of the actor addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName) require.NoError(t, err) - err = remoting.RemoteTell(ctx, addr, nil) + from := address.NoSender() + err = remoting.RemoteTell(ctx, from, addr, nil) // perform some assertions require.Error(t, err) @@ -189,7 +191,8 @@ func TestRemoteTell(t *testing.T) { // create a message to send to the test actor message := new(testpb.TestSend) // send the message to the actor - err = remoting.RemoteTell(ctx, address.From(addr), message) + from := address.NoSender() + err = remoting.RemoteTell(ctx, from, address.From(addr), message) // perform some assertions require.Error(t, err) @@ -244,7 +247,8 @@ func TestRemoteTell(t *testing.T) { // create a message to send to the test actor message := new(testpb.TestSend) // send the message to the actor - err = remoting.RemoteTell(ctx, addr, message) + from := address.NoSender() + err = remoting.RemoteTell(ctx, from, addr, message) // perform some assertions require.Error(t, err) require.EqualError(t, err, "failed_precondition: remoting is not enabled") @@ -301,7 +305,8 @@ func TestRemoteTell(t *testing.T) { messages[i] = new(testpb.TestSend) } - err = remoting.RemoteBatchTell(ctx, addr, messages) + from := address.NoSender() + err = remoting.RemoteBatchTell(ctx, from, addr, messages) require.NoError(t, err) // wait for processing to complete on the actor side @@ -357,10 +362,11 @@ func TestRemoteTell(t *testing.T) { } remoting := NewRemoting() + from := address.NoSender() // create a message to send to the test actor message := new(testpb.TestSend) // send the message to the actor - err = remoting.RemoteBatchTell(ctx, address.From(addr), []proto.Message{message}) + err = remoting.RemoteBatchTell(ctx, from, address.From(addr), []proto.Message{message}) // perform some assertions require.Error(t, err) @@ -407,7 +413,7 @@ func TestRemoteTell(t *testing.T) { remoting := NewRemoting() addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName) require.NoError(t, err) - + from := address.NoSender() // let us disable remoting actorsSystem := sys.(*actorSystem) actorsSystem.remotingEnabled.Store(false) @@ -415,7 +421,7 @@ func TestRemoteTell(t *testing.T) { // create a message to send to the test actor message := new(testpb.TestSend) // send the message to the actor - err = remoting.RemoteBatchTell(ctx, addr, []proto.Message{message}) + err = remoting.RemoteBatchTell(ctx, from, addr, []proto.Message{message}) // perform some assertions require.Error(t, err) require.EqualError(t, err, "failed_precondition: remoting is not enabled") @@ -469,10 +475,11 @@ func TestRemoteTell(t *testing.T) { require.NoError(t, actorRef.Shutdown(ctx)) lib.Pause(time.Second) + from := address.NoSender() // create a message to send to the test actor message := new(testpb.TestSend) // send the message to the actor - err = remoting.RemoteTell(ctx, addr, message) + err = remoting.RemoteTell(ctx, from, addr, message) // perform some assertions require.Error(t, err) require.Contains(t, err.Error(), "not found") @@ -526,10 +533,11 @@ func TestRemoteTell(t *testing.T) { require.NoError(t, actorRef.Shutdown(ctx)) lib.Pause(time.Second) + from := address.NoSender() // create a message to send to the test actor message := new(testpb.TestSend) // send the message to the actor - err = remoting.RemoteBatchTell(ctx, addr, []proto.Message{message}) + err = remoting.RemoteBatchTell(ctx, from, addr, []proto.Message{message}) // perform some assertions require.Error(t, err) require.Contains(t, err.Error(), "not found") @@ -578,6 +586,7 @@ func TestRemoteAsk(t *testing.T) { assert.NotNil(t, actorRef) remoting := NewRemoting() + from := address.NoSender() // get the address of the actor addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName) require.NoError(t, err) @@ -585,7 +594,7 @@ func TestRemoteAsk(t *testing.T) { // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - reply, err := remoting.RemoteAsk(ctx, addr, message, time.Minute) + reply, err := remoting.RemoteAsk(ctx, from, addr, message, time.Minute) // perform some assertions require.NoError(t, err) require.NotNil(t, reply) @@ -645,8 +654,9 @@ func TestRemoteAsk(t *testing.T) { addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName) require.NoError(t, err) + from := address.NoSender() // send the message to the actor - reply, err := remoting.RemoteAsk(ctx, addr, nil, time.Minute) + reply, err := remoting.RemoteAsk(ctx, from, addr, nil, time.Minute) // perform some assertions require.Error(t, err) require.Nil(t, reply) @@ -701,10 +711,11 @@ func TestRemoteAsk(t *testing.T) { } remoting := NewRemoting() + from := address.NoSender() // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - reply, err := remoting.RemoteAsk(ctx, address.From(addr), message, time.Minute) + reply, err := remoting.RemoteAsk(ctx, from, address.From(addr), message, time.Minute) // perform some assertions require.Error(t, err) require.Nil(t, reply) @@ -762,10 +773,11 @@ func TestRemoteAsk(t *testing.T) { actorsSystem := sys.(*actorSystem) actorsSystem.remotingEnabled.Store(false) + from := address.NoSender() // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - reply, err := remoting.RemoteAsk(ctx, addr, message, time.Minute) + reply, err := remoting.RemoteAsk(ctx, from, addr, message, time.Minute) // perform some assertions require.Error(t, err) require.EqualError(t, err, "failed_precondition: remoting is not enabled") @@ -817,10 +829,11 @@ func TestRemoteAsk(t *testing.T) { addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName) require.NoError(t, err) + from := address.NoSender() // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - replies, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}, time.Minute) + replies, err := remoting.RemoteBatchAsk(ctx, from, addr, []proto.Message{message}, time.Minute) // perform some assertions require.NoError(t, err) require.Len(t, replies, 1) @@ -885,10 +898,11 @@ func TestRemoteAsk(t *testing.T) { } remoting := NewRemoting() + from := address.NoSender() // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - reply, err := remoting.RemoteBatchAsk(ctx, address.From(addr), []proto.Message{message}, time.Minute) + reply, err := remoting.RemoteBatchAsk(ctx, from, address.From(addr), []proto.Message{message}, time.Minute) // perform some assertions require.Error(t, err) require.Nil(t, reply) @@ -946,10 +960,11 @@ func TestRemoteAsk(t *testing.T) { actorsSystem := sys.(*actorSystem) actorsSystem.remotingEnabled.Store(false) + from := address.NoSender() // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - reply, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}, time.Minute) + reply, err := remoting.RemoteBatchAsk(ctx, from, addr, []proto.Message{message}, time.Minute) // perform some assertions require.Error(t, err) require.EqualError(t, err, "failed_precondition: remoting is not enabled") @@ -1001,7 +1016,7 @@ func TestRemoteAsk(t *testing.T) { assert.NotNil(t, actorRef) remoting := NewRemoting() - + from := address.NoSender() // get the address of the actor addr, err := remoting.RemoteLookup(ctx, host, remotingPort, actorName) require.NoError(t, err) @@ -1013,7 +1028,7 @@ func TestRemoteAsk(t *testing.T) { // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - reply, err := remoting.RemoteAsk(ctx, addr, message, time.Minute) + reply, err := remoting.RemoteAsk(ctx, from, addr, message, time.Minute) // perform some assertions require.Error(t, err) require.Contains(t, err.Error(), "not found") @@ -1071,10 +1086,11 @@ func TestRemoteAsk(t *testing.T) { require.NoError(t, actorRef.Shutdown(ctx)) lib.Pause(time.Second) + from := address.NoSender() // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - reply, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}, time.Minute) + reply, err := remoting.RemoteBatchAsk(ctx, from, addr, []proto.Message{message}, time.Minute) // perform some assertions require.Error(t, err) require.Contains(t, err.Error(), "not found") @@ -1392,8 +1408,9 @@ func TestAPIRemoteSpawn(t *testing.T) { require.NoError(t, err) require.NotNil(t, addr) + from := address.NoSender() // send the message to exchanger actor one using remote messaging - reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), time.Minute) + reply, err := remoting.RemoteAsk(ctx, from, addr, new(testpb.TestReply), time.Minute) require.NoError(t, err) require.NotNil(t, reply) diff --git a/actors/scheduler.go b/actors/scheduler.go index 5c0d1eab..1d159bd8 100644 --- a/actors/scheduler.go +++ b/actors/scheduler.go @@ -228,7 +228,7 @@ func (x *scheduler) ScheduleWithCron(ctx context.Context, message proto.Message, // This requires remoting to be enabled on the actor system. // This will send the given message to the actor after the given interval specified // The message will be sent once -func (x *scheduler) RemoteScheduleOnce(ctx context.Context, message proto.Message, address *address.Address, interval time.Duration) error { +func (x *scheduler) RemoteScheduleOnce(ctx context.Context, message proto.Message, to *address.Address, interval time.Duration) error { x.mu.Lock() defer x.mu.Unlock() @@ -242,14 +242,15 @@ func (x *scheduler) RemoteScheduleOnce(ctx context.Context, message proto.Messag job := job.NewFunctionJob[bool]( func(ctx context.Context) (bool, error) { - if err := x.remoting.RemoteTell(ctx, address, message); err != nil { + from := address.NoSender() + if err := x.remoting.RemoteTell(ctx, from, to, message); err != nil { return false, err } return true, nil }, ) - key := fmt.Sprintf("%s@%s", address.GetName(), net.JoinHostPort(address.GetHost(), strconv.Itoa(int(address.GetPort())))) + key := fmt.Sprintf("%s@%s", to.GetName(), net.JoinHostPort(to.GetHost(), strconv.Itoa(int(to.GetPort())))) jobDetails := quartz.NewJobDetail(job, quartz.NewJobKey(key)) if err := x.distributeJobKeyOrNot(ctx, jobDetails); err != nil { if errors.Is(err, errSkipJobScheduling) { @@ -264,7 +265,7 @@ func (x *scheduler) RemoteScheduleOnce(ctx context.Context, message proto.Messag // RemoteSchedule schedules a message to be sent to a remote actor in the future. // This requires remoting to be enabled on the actor system. // This will send the given message to the actor at the given interval specified -func (x *scheduler) RemoteSchedule(ctx context.Context, message proto.Message, address *address.Address, interval time.Duration) error { +func (x *scheduler) RemoteSchedule(ctx context.Context, message proto.Message, to *address.Address, interval time.Duration) error { x.mu.Lock() defer x.mu.Unlock() @@ -278,14 +279,15 @@ func (x *scheduler) RemoteSchedule(ctx context.Context, message proto.Message, a job := job.NewFunctionJob[bool]( func(ctx context.Context) (bool, error) { - if err := x.remoting.RemoteTell(ctx, address, message); err != nil { + from := address.NoSender() + if err := x.remoting.RemoteTell(ctx, from, to, message); err != nil { return false, err } return true, nil }, ) - key := fmt.Sprintf("%s@%s", address.GetName(), net.JoinHostPort(address.GetHost(), strconv.Itoa(int(address.GetPort())))) + key := fmt.Sprintf("%s@%s", to.GetName(), net.JoinHostPort(to.GetHost(), strconv.Itoa(int(to.GetPort())))) jobDetails := quartz.NewJobDetail(job, quartz.NewJobKey(key)) if err := x.distributeJobKeyOrNot(ctx, jobDetails); err != nil { if errors.Is(err, errSkipJobScheduling) { @@ -298,7 +300,7 @@ func (x *scheduler) RemoteSchedule(ctx context.Context, message proto.Message, a } // RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression. -func (x *scheduler) RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *address.Address, cronExpression string) error { +func (x *scheduler) RemoteScheduleWithCron(ctx context.Context, message proto.Message, to *address.Address, cronExpression string) error { x.mu.Lock() defer x.mu.Unlock() @@ -312,14 +314,15 @@ func (x *scheduler) RemoteScheduleWithCron(ctx context.Context, message proto.Me job := job.NewFunctionJob[bool]( func(ctx context.Context) (bool, error) { - if err := x.remoting.RemoteTell(ctx, address, message); err != nil { + from := address.NoSender() + if err := x.remoting.RemoteTell(ctx, from, to, message); err != nil { return false, err } return true, nil }, ) - key := fmt.Sprintf("%s@%s", address.GetName(), net.JoinHostPort(address.GetHost(), strconv.Itoa(int(address.GetPort())))) + key := fmt.Sprintf("%s@%s", to.GetName(), net.JoinHostPort(to.GetHost(), strconv.Itoa(int(to.GetPort())))) jobDetails := quartz.NewJobDetail(job, quartz.NewJobKey(key)) if err := x.distributeJobKeyOrNot(ctx, jobDetails); err != nil { diff --git a/address/address.go b/address/address.go index 94df202c..95e42ca6 100644 --- a/address/address.go +++ b/address/address.go @@ -41,8 +41,8 @@ import ( // protocol defines the Go-Akt addressing protocol const protocol = "goakt" -// NoSender means that there is no sender -var NoSender = new(goaktpb.Address) +// nilAddress means that there is no sender +var nilAddress = new(goaktpb.Address) type Address struct { *goaktpb.Address @@ -59,7 +59,7 @@ func New(name, system string, host string, port int) *Address { Name: name, Id: uuid.NewString(), System: system, - Parent: NoSender, + Parent: nilAddress, }, } } @@ -71,10 +71,10 @@ func From(addr *goaktpb.Address) *Address { } } -// Default creates an instance of Address with default values. -func Default() *Address { +// NoSender creates an instance of Address with default values. +func NoSender() *Address { return &Address{ - NoSender, + nilAddress, } } @@ -194,7 +194,7 @@ func (a *Address) UnmarshalBinary(data []byte) error { // Validate returns an error when the address is not valid func (a *Address) Validate() error { - if proto.Equal(a.Address, NoSender) { + if proto.Equal(a.Address, nilAddress) { return nil } pattern := "^[a-zA-Z0-9][a-zA-Z0-9-_\\.]*$" diff --git a/address/address_test.go b/address/address_test.go index cf6551ca..4a9cc69f 100644 --- a/address/address_test.go +++ b/address/address_test.go @@ -41,7 +41,7 @@ func TestAddress(t *testing.T) { assert.EqualValues(t, 1234, addr.Port()) assert.NotEmpty(t, addr.ID()) assert.NotNil(t, addr.Parent()) - assert.True(t, proto.Equal(addr.Parent(), NoSender)) + assert.True(t, proto.Equal(addr.Parent(), nilAddress)) assert.Equal(t, expected, addr.String()) }) @@ -82,7 +82,7 @@ func TestAddress(t *testing.T) { assert.Equal(t, expected, actual.String()) }) t.Run("With Default", func(t *testing.T) { - addr := Default() + addr := NoSender() assert.NoError(t, addr.Validate()) addr.WithHost("localhost").WithPort(123).WithSystem("system").WithName("name")