Skip to content

Commit

Permalink
refactor: refactor nats discovery
Browse files Browse the repository at this point in the history
BREAKING CHANGE: - Remove host node param
  • Loading branch information
Tochemey committed Nov 16, 2024
1 parent 556cc47 commit bbe3f23
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 135 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ To use the NATS discovery provider one needs to provide the following:
- `Timeout`: the nodes discovery timeout
- `MaxJoinAttempts`: the maximum number of attempts to connect an existing NATs server. Defaults to `5`
- `ReconnectWait`: the time to backoff after attempting a reconnect to a server that we were already connected to previously. Default to `2 seconds`
- `Host`: the given node host address
- `DiscoveryPort`: the discovery port of the given node

```go
package main
Expand All @@ -562,13 +564,12 @@ config := nats.Config{
ActorSystemName: actorSystemName,
NatsServer: natsServer,
NatsSubject: natsSubject,
Host: "127.0.0.1",
DiscoveryPort: 2003
}

// define the host node instance
hostNode := discovery.Node{}

// instantiate the NATS discovery provider by passing the config and the hostNode
disco := nats.NewDiscovery(&config, &hostNode)
disco := nats.NewDiscovery(&config)

// pass the service discovery when enabling cluster mode in the actor system
```
Expand Down
12 changes: 3 additions & 9 deletions actors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,18 +432,12 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
ActorSystemName: actorSystemName,
NatsServer: fmt.Sprintf("nats://%s", serverAddr),
NatsSubject: natsSubject,
}

hostNode := discovery.Node{
Name: nodeName,
Host: host,
DiscoveryPort: gossipPort,
PeersPort: clusterPort,
RemotingPort: remotingPort,
Host: host,
DiscoveryPort: gossipPort,
}

// create the instance of provider
provider := nats.NewDiscovery(&config, &hostNode, nats.WithLogger(log.DiscardLogger))
provider := nats.NewDiscovery(&config, nats.WithLogger(log.DiscardLogger))

// create the actor system
system, err := NewActorSystem(
Expand Down
4 changes: 3 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,8 @@ func startNode(t *testing.T, logger log.Logger, nodeName, serverAddr string) (sy
ActorSystemName: actorSystemName,
NatsServer: fmt.Sprintf("nats://%s", serverAddr),
NatsSubject: natsSubject,
Host: host,
DiscoveryPort: gossipPort,
}

hostNode := discovery.Node{
Expand All @@ -771,7 +773,7 @@ func startNode(t *testing.T, logger log.Logger, nodeName, serverAddr string) (sy
}

// create the instance of provider
natsProvider := nats.NewDiscovery(&config, &hostNode, nats.WithLogger(logger))
natsProvider := nats.NewDiscovery(&config, nats.WithLogger(logger))

clusterConfig := actors.
NewClusterConfig().
Expand Down
6 changes: 6 additions & 0 deletions discovery/nats/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type Config struct {
// to a server that we were already connected to previously.
// Defaults to 2s.
ReconnectWait time.Duration
// specifies the host address
Host string
// specifies the discovery port
DiscoveryPort int
}

// Validate checks whether the given discovery configuration is valid
Expand All @@ -61,6 +65,8 @@ func (x Config) Validate() error {
AddValidator(validation.NewEmptyStringValidator("NatsSubject", x.NatsSubject)).
AddValidator(validation.NewEmptyStringValidator("ApplicationName", x.ApplicationName)).
AddValidator(validation.NewEmptyStringValidator("ActorSystemName", x.ActorSystemName)).
AddValidator(validation.NewEmptyStringValidator("Host", x.Host)).
AddAssertion(x.DiscoveryPort > 0, "DiscoveryPort is invalid").
Validate()
}

Expand Down
8 changes: 7 additions & 1 deletion discovery/nats/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func TestConfig(t *testing.T) {
ApplicationName: "applicationName",
ActorSystemName: "actorSys",
NatsSubject: "nats-subject",
Host: "host",
DiscoveryPort: 123,
}
assert.NoError(t, config.Validate())
})
Expand All @@ -46,15 +48,19 @@ func TestConfig(t *testing.T) {
ApplicationName: "applicationName",
ActorSystemName: "actorSys",
NatsSubject: "nats-subject",
Host: "host",
DiscoveryPort: 123,
}
assert.Error(t, config.Validate())
})
t.Run("With invalid host", func(t *testing.T) {
t.Run("With invalid nats server address", func(t *testing.T) {
config := &Config{
NatsServer: "nats://:2322",
ApplicationName: "applicationName",
ActorSystemName: "actorSys",
NatsSubject: "nats-subject",
Host: "host",
DiscoveryPort: 123,
}
assert.Error(t, config.Validate())
})
Expand Down
37 changes: 18 additions & 19 deletions discovery/nats/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,34 +54,33 @@ type Discovery struct {
// define a slice of subscriptions
subscriptions []*nats.Subscription

// defines the host node
hostNode *discovery.Node

// define a logger
logger log.Logger

address string
}

// enforce compilation error
var _ discovery.Provider = &Discovery{}

// NewDiscovery returns an instance of the kubernetes discovery provider
func NewDiscovery(config *Config, hostNode *discovery.Node, opts ...Option) *Discovery {
func NewDiscovery(config *Config, opts ...Option) *Discovery {
// create an instance of
discovery := &Discovery{
d := &Discovery{
mu: sync.Mutex{},
initialized: atomic.NewBool(false),
registered: atomic.NewBool(false),
config: config,
logger: log.DefaultLogger,
hostNode: hostNode,
}

// apply the various options
for _, opt := range opts {
opt.Apply(discovery)
opt.Apply(d)
}

return discovery
d.address = net.JoinHostPort(config.Host, strconv.Itoa(config.DiscoveryPort))
return d
}

// ID returns the discovery provider id
Expand Down Expand Up @@ -117,7 +116,7 @@ func (d *Discovery) Initialize() error {
// create the nats connection option
opts := nats.GetDefaultOptions()
opts.Url = d.config.NatsServer
opts.Name = d.hostNode.Name
opts.Name = d.address
opts.ReconnectWait = d.config.ReconnectWait
opts.MaxReconnect = -1

Expand Down Expand Up @@ -174,9 +173,9 @@ func (d *Discovery) Register() error {
message.GetName(), message.GetHost(), message.GetPort())

response := &internalpb.NatsMessage{
Host: d.hostNode.Host,
Port: int32(d.hostNode.DiscoveryPort),
Name: d.hostNode.Name,
Host: d.config.Host,
Port: int32(d.config.DiscoveryPort),
Name: d.address,
MessageType: internalpb.NatsMessageType_NATS_MESSAGE_TYPE_RESPONSE,
}

Expand Down Expand Up @@ -226,9 +225,9 @@ func (d *Discovery) Deregister() error {
if d.connection != nil {
// send a message to deregister stating we are out
message := &internalpb.NatsMessage{
Host: d.hostNode.Host,
Port: int32(d.hostNode.DiscoveryPort),
Name: d.hostNode.Name,
Host: d.config.Host,
Port: int32(d.config.DiscoveryPort),
Name: d.address,
MessageType: internalpb.NatsMessageType_NATS_MESSAGE_TYPE_DEREGISTER,
}

Expand Down Expand Up @@ -265,9 +264,9 @@ func (d *Discovery) DiscoverPeers() ([]string, error) {
}

request := &internalpb.NatsMessage{
Host: d.hostNode.Host,
Port: int32(d.hostNode.DiscoveryPort),
Name: d.hostNode.Name,
Host: d.config.Host,
Port: int32(d.config.DiscoveryPort),
Name: d.address,
MessageType: internalpb.NatsMessageType_NATS_MESSAGE_TYPE_REQUEST,
}

Expand All @@ -278,7 +277,7 @@ func (d *Discovery) DiscoverPeers() ([]string, error) {

var peers []string
timeout := time.After(d.config.Timeout)
me := d.hostNode.DiscoveryAddress()
me := d.address
for {
select {
case msg, ok := <-recv:
Expand Down
Loading

0 comments on commit bbe3f23

Please sign in to comment.