Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add TLS support #501

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 211 additions & 87 deletions actors/actor_system.go

Large diffs are not rendered by default.

2,915 changes: 1,554 additions & 1,361 deletions actors/actor_system_test.go

Large diffs are not rendered by default.

312 changes: 0 additions & 312 deletions actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,12 @@ package actors

import (
"context"
"strings"
"time"

"connectrpc.com/connect"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/tochemey/goakt/v2/address"
"github.com/tochemey/goakt/v2/internal/http"
"github.com/tochemey/goakt/v2/internal/internalpb"
"github.com/tochemey/goakt/v2/internal/internalpb/internalpbconnect"
)

// Ask sends a synchronous message to another actor and expect a response.
Expand Down Expand Up @@ -109,312 +103,6 @@ func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...p
return
}

// RemoteTell sends a message to an actor remotely without expecting any reply
func RemoteTell(ctx context.Context, to *address.Address, message proto.Message) error {
marshaled, err := anypb.New(message)
if err != nil {
return ErrInvalidRemoteMessage(err)
}

remoteClient := internalpbconnect.NewRemotingServiceClient(
http.NewClient(),
http.URL(to.GetHost(), int(to.GetPort())),
)

request := &internalpb.RemoteTellRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Receiver: to.Address,
Message: marshaled,
},
}

stream := remoteClient.RemoteTell(ctx)
if err := stream.Send(request); err != nil {
if eof(err) {
if _, err := stream.CloseAndReceive(); err != nil {
return err
}
return nil
}
return err
}

// close the connection
if _, err := stream.CloseAndReceive(); err != nil {
return err
}

return nil
}

// RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) {
marshaled, err := anypb.New(message)
if err != nil {
return nil, ErrInvalidRemoteMessage(err)
}

remotingService := internalpbconnect.NewRemotingServiceClient(
http.NewClient(),
http.URL(to.GetHost(), int(to.GetPort())),
)

request := &internalpb.RemoteAskRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Receiver: to.Address,
Message: marshaled,
},
Timeout: durationpb.New(timeout),
}
stream := remotingService.RemoteAsk(ctx)
errc := make(chan error, 1)

go func() {
defer close(errc)
for {
resp, err := stream.Receive()
if err != nil {
errc <- err
return
}

response = resp.GetMessage()
}
}()

err = stream.Send(request)
if err != nil {
return nil, err
}

if err := stream.CloseRequest(); err != nil {
return nil, err
}

err = <-errc
if eof(err) {
return response, nil
}

if err != nil {
return nil, err
}

return
}

// RemoteLookup look for an actor address on a remote node.
func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *address.Address, err error) {
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.NewClient(),
http.URL(host, port),
)

request := connect.NewRequest(&internalpb.RemoteLookupRequest{
Host: host,
Port: int32(port),
Name: name,
})

response, err := remoteClient.RemoteLookup(ctx, request)
if err != nil {
code := connect.CodeOf(err)
if code == connect.CodeNotFound {
return nil, nil
}
return nil, err
}

return address.From(response.Msg.GetAddress()), nil
}

// RemoteBatchTell sends bulk asynchronous messages to an actor
func RemoteBatchTell(ctx context.Context, to *address.Address, messages ...proto.Message) error {
var requests []*internalpb.RemoteTellRequest
for _, message := range messages {
packed, err := anypb.New(message)
if err != nil {
return ErrInvalidRemoteMessage(err)
}

requests = append(requests, &internalpb.RemoteTellRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Receiver: to.Address,
Message: packed,
},
})
}

remoteClient := internalpbconnect.NewRemotingServiceClient(
http.NewClient(),
http.URL(to.GetHost(), int(to.GetPort())),
)

stream := remoteClient.RemoteTell(ctx)
for _, request := range requests {
err := stream.Send(request)
if eof(err) {
if _, err := stream.CloseAndReceive(); err != nil {
return err
}
return nil
}

if err != nil {
return err
}
}

// close the connection
if _, err := stream.CloseAndReceive(); err != nil {
return err
}

return nil
}

// RemoteBatchAsk sends bulk messages to an actor with responses expected
func RemoteBatchAsk(ctx context.Context, to *address.Address, messages ...proto.Message) (responses []*anypb.Any, err error) {
var requests []*internalpb.RemoteAskRequest
for _, message := range messages {
packed, err := anypb.New(message)
if err != nil {
return nil, ErrInvalidRemoteMessage(err)
}

requests = append(requests, &internalpb.RemoteAskRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: address.NoSender,
Receiver: to.Address,
Message: packed,
},
})
}

remoteClient := internalpbconnect.NewRemotingServiceClient(
http.NewClient(),
http.URL(to.GetHost(), int(to.GetPort())),
)

stream := remoteClient.RemoteAsk(ctx)
errc := make(chan error, 1)

go func() {
defer close(errc)
for {
resp, err := stream.Receive()
if err != nil {
errc <- err
return
}

responses = append(responses, resp.GetMessage())
}
}()

for _, request := range requests {
err := stream.Send(request)
if err != nil {
return nil, err
}
}

if err := stream.CloseRequest(); err != nil {
return nil, err
}

err = <-errc
if eof(err) {
return responses, nil
}

if err != nil {
return nil, err
}

return
}

// RemoteReSpawn restarts actor on a remote node.
func RemoteReSpawn(ctx context.Context, host string, port int, name string) error {
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.NewClient(),
http.URL(host, port),
)

request := connect.NewRequest(&internalpb.RemoteReSpawnRequest{
Host: host,
Port: int32(port),
Name: name,
})

if _, err := remoteClient.RemoteReSpawn(ctx, request); err != nil {
code := connect.CodeOf(err)
if code == connect.CodeNotFound {
return nil
}
return err
}

return nil
}

// RemoteStop stops an actor on a remote node.
func RemoteStop(ctx context.Context, host string, port int, name string) error {
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.NewClient(),
http.URL(host, port),
)

request := connect.NewRequest(&internalpb.RemoteStopRequest{
Host: host,
Port: int32(port),
Name: name,
})

if _, err := remoteClient.RemoteStop(ctx, request); err != nil {
code := connect.CodeOf(err)
if code == connect.CodeNotFound {
return nil
}
return err
}

return nil
}

// RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem
func RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error {
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.NewClient(),
http.URL(host, port),
)

request := connect.NewRequest(&internalpb.RemoteSpawnRequest{
Host: host,
Port: int32(port),
ActorName: name,
ActorType: actorType,
})

if _, err := remoteClient.RemoteSpawn(ctx, request); err != nil {
code := connect.CodeOf(err)
if code == connect.CodeFailedPrecondition {
connectErr := err.(*connect.Error)
e := connectErr.Unwrap()
// TODO: find a better way to use errors.Is with connect.Error
if strings.Contains(e.Error(), ErrTypeNotRegistered.Error()) {
return ErrTypeNotRegistered
}
}
return err
}
return nil
}

// toReceiveContext creates a ReceiveContext provided a message and a receiver
func toReceiveContext(ctx context.Context, to *PID, message proto.Message, async bool) (*ReceiveContext, error) {
switch msg := message.(type) {
Expand Down
Loading
Loading