From bbe3f2388d84e51a2374ab20da687ed2929d72ab Mon Sep 17 00:00:00 2001 From: Tochemey Date: Sat, 16 Nov 2024 12:40:44 +0000 Subject: [PATCH] refactor: refactor nats discovery BREAKING CHANGE: - Remove host node param --- README.md | 9 +- actors/helper_test.go | 12 +-- client/client_test.go | 4 +- discovery/nats/config.go | 6 ++ discovery/nats/config_test.go | 8 +- discovery/nats/discovery.go | 37 ++++----- discovery/nats/discovery_test.go | 136 ++++++++----------------------- internal/cluster/engine_test.go | 4 +- 8 files changed, 81 insertions(+), 135 deletions(-) diff --git a/README.md b/README.md index ec8320e6..68cff93f 100644 --- a/README.md +++ b/README.md @@ -543,6 +543,8 @@ To use the NATS discovery provider one needs to provide the following: - `Timeout`: the nodes discovery timeout - `MaxJoinAttempts`: the maximum number of attempts to connect an existing NATs server. Defaults to `5` - `ReconnectWait`: the time to backoff after attempting a reconnect to a server that we were already connected to previously. Default to `2 seconds` +- `Host`: the given node host address +- `DiscoveryPort`: the discovery port of the given node ```go package main @@ -562,13 +564,12 @@ config := nats.Config{ ActorSystemName: actorSystemName, NatsServer: natsServer, NatsSubject: natsSubject, + Host: "127.0.0.1", + DiscoveryPort: 2003 } -// define the host node instance -hostNode := discovery.Node{} - // instantiate the NATS discovery provider by passing the config and the hostNode -disco := nats.NewDiscovery(&config, &hostNode) +disco := nats.NewDiscovery(&config) // pass the service discovery when enabling cluster mode in the actor system ``` diff --git a/actors/helper_test.go b/actors/helper_test.go index 42055e37..fcc97149 100644 --- a/actors/helper_test.go +++ b/actors/helper_test.go @@ -432,18 +432,12 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem, ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", serverAddr), NatsSubject: natsSubject, - } - - hostNode := discovery.Node{ - Name: nodeName, - Host: host, - DiscoveryPort: gossipPort, - PeersPort: clusterPort, - RemotingPort: remotingPort, + Host: host, + DiscoveryPort: gossipPort, } // create the instance of provider - provider := nats.NewDiscovery(&config, &hostNode, nats.WithLogger(log.DiscardLogger)) + provider := nats.NewDiscovery(&config, nats.WithLogger(log.DiscardLogger)) // create the actor system system, err := NewActorSystem( diff --git a/client/client_test.go b/client/client_test.go index 94ac68ec..9d8d9f2d 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -760,6 +760,8 @@ func startNode(t *testing.T, logger log.Logger, nodeName, serverAddr string) (sy ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", serverAddr), NatsSubject: natsSubject, + Host: host, + DiscoveryPort: gossipPort, } hostNode := discovery.Node{ @@ -771,7 +773,7 @@ func startNode(t *testing.T, logger log.Logger, nodeName, serverAddr string) (sy } // create the instance of provider - natsProvider := nats.NewDiscovery(&config, &hostNode, nats.WithLogger(logger)) + natsProvider := nats.NewDiscovery(&config, nats.WithLogger(logger)) clusterConfig := actors. NewClusterConfig(). diff --git a/discovery/nats/config.go b/discovery/nats/config.go index a8207321..c6ea32d5 100644 --- a/discovery/nats/config.go +++ b/discovery/nats/config.go @@ -51,6 +51,10 @@ type Config struct { // to a server that we were already connected to previously. // Defaults to 2s. ReconnectWait time.Duration + // specifies the host address + Host string + // specifies the discovery port + DiscoveryPort int } // Validate checks whether the given discovery configuration is valid @@ -61,6 +65,8 @@ func (x Config) Validate() error { AddValidator(validation.NewEmptyStringValidator("NatsSubject", x.NatsSubject)). AddValidator(validation.NewEmptyStringValidator("ApplicationName", x.ApplicationName)). AddValidator(validation.NewEmptyStringValidator("ActorSystemName", x.ActorSystemName)). + AddValidator(validation.NewEmptyStringValidator("Host", x.Host)). + AddAssertion(x.DiscoveryPort > 0, "DiscoveryPort is invalid"). Validate() } diff --git a/discovery/nats/config_test.go b/discovery/nats/config_test.go index f2d64673..12192f56 100644 --- a/discovery/nats/config_test.go +++ b/discovery/nats/config_test.go @@ -37,6 +37,8 @@ func TestConfig(t *testing.T) { ApplicationName: "applicationName", ActorSystemName: "actorSys", NatsSubject: "nats-subject", + Host: "host", + DiscoveryPort: 123, } assert.NoError(t, config.Validate()) }) @@ -46,15 +48,19 @@ func TestConfig(t *testing.T) { ApplicationName: "applicationName", ActorSystemName: "actorSys", NatsSubject: "nats-subject", + Host: "host", + DiscoveryPort: 123, } assert.Error(t, config.Validate()) }) - t.Run("With invalid host", func(t *testing.T) { + t.Run("With invalid nats server address", func(t *testing.T) { config := &Config{ NatsServer: "nats://:2322", ApplicationName: "applicationName", ActorSystemName: "actorSys", NatsSubject: "nats-subject", + Host: "host", + DiscoveryPort: 123, } assert.Error(t, config.Validate()) }) diff --git a/discovery/nats/discovery.go b/discovery/nats/discovery.go index d7199c0c..ac047188 100644 --- a/discovery/nats/discovery.go +++ b/discovery/nats/discovery.go @@ -54,34 +54,33 @@ type Discovery struct { // define a slice of subscriptions subscriptions []*nats.Subscription - // defines the host node - hostNode *discovery.Node - // define a logger logger log.Logger + + address string } // enforce compilation error var _ discovery.Provider = &Discovery{} // NewDiscovery returns an instance of the kubernetes discovery provider -func NewDiscovery(config *Config, hostNode *discovery.Node, opts ...Option) *Discovery { +func NewDiscovery(config *Config, opts ...Option) *Discovery { // create an instance of - discovery := &Discovery{ + d := &Discovery{ mu: sync.Mutex{}, initialized: atomic.NewBool(false), registered: atomic.NewBool(false), config: config, logger: log.DefaultLogger, - hostNode: hostNode, } // apply the various options for _, opt := range opts { - opt.Apply(discovery) + opt.Apply(d) } - return discovery + d.address = net.JoinHostPort(config.Host, strconv.Itoa(config.DiscoveryPort)) + return d } // ID returns the discovery provider id @@ -117,7 +116,7 @@ func (d *Discovery) Initialize() error { // create the nats connection option opts := nats.GetDefaultOptions() opts.Url = d.config.NatsServer - opts.Name = d.hostNode.Name + opts.Name = d.address opts.ReconnectWait = d.config.ReconnectWait opts.MaxReconnect = -1 @@ -174,9 +173,9 @@ func (d *Discovery) Register() error { message.GetName(), message.GetHost(), message.GetPort()) response := &internalpb.NatsMessage{ - Host: d.hostNode.Host, - Port: int32(d.hostNode.DiscoveryPort), - Name: d.hostNode.Name, + Host: d.config.Host, + Port: int32(d.config.DiscoveryPort), + Name: d.address, MessageType: internalpb.NatsMessageType_NATS_MESSAGE_TYPE_RESPONSE, } @@ -226,9 +225,9 @@ func (d *Discovery) Deregister() error { if d.connection != nil { // send a message to deregister stating we are out message := &internalpb.NatsMessage{ - Host: d.hostNode.Host, - Port: int32(d.hostNode.DiscoveryPort), - Name: d.hostNode.Name, + Host: d.config.Host, + Port: int32(d.config.DiscoveryPort), + Name: d.address, MessageType: internalpb.NatsMessageType_NATS_MESSAGE_TYPE_DEREGISTER, } @@ -265,9 +264,9 @@ func (d *Discovery) DiscoverPeers() ([]string, error) { } request := &internalpb.NatsMessage{ - Host: d.hostNode.Host, - Port: int32(d.hostNode.DiscoveryPort), - Name: d.hostNode.Name, + Host: d.config.Host, + Port: int32(d.config.DiscoveryPort), + Name: d.address, MessageType: internalpb.NatsMessageType_NATS_MESSAGE_TYPE_REQUEST, } @@ -278,7 +277,7 @@ func (d *Discovery) DiscoverPeers() ([]string, error) { var peers []string timeout := time.After(d.config.Timeout) - me := d.hostNode.DiscoveryAddress() + me := d.address for { select { case msg, ok := <-recv: diff --git a/discovery/nats/discovery_test.go b/discovery/nats/discovery_test.go index b2399c7e..39ac4963 100644 --- a/discovery/nats/discovery_test.go +++ b/discovery/nats/discovery_test.go @@ -64,10 +64,8 @@ func startNatsServer(t *testing.T) *natsserver.Server { func newPeer(t *testing.T, serverAddr string) *Discovery { // generate the ports for the single node - nodePorts := dynaport.Get(3) + nodePorts := dynaport.Get(1) gossipPort := nodePorts[0] - clusterPort := nodePorts[1] - remotingPort := nodePorts[2] // create a Cluster node host := "127.0.0.1" @@ -82,18 +80,11 @@ func newPeer(t *testing.T, serverAddr string) *Discovery { ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", serverAddr), NatsSubject: natsSubject, + Host: host, + DiscoveryPort: gossipPort, } - - hostNode := discovery.Node{ - Name: host, - Host: host, - DiscoveryPort: gossipPort, - PeersPort: clusterPort, - RemotingPort: remotingPort, - } - // create the instance of provider - provider := NewDiscovery(config, &hostNode, WithLogger(log.DiscardLogger)) + provider := NewDiscovery(config, WithLogger(log.DiscardLogger)) // initialize err := provider.Initialize() @@ -108,10 +99,8 @@ func TestDiscovery(t *testing.T) { srv := startNatsServer(t) // generate the ports for the single node - nodePorts := dynaport.Get(3) + nodePorts := dynaport.Get(1) gossipPort := nodePorts[0] - clusterPort := nodePorts[1] - remotingPort := nodePorts[2] // create a Cluster node host := "127.0.0.1" @@ -126,18 +115,12 @@ func TestDiscovery(t *testing.T) { ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", srv.Addr().String()), NatsSubject: natsSubject, - } - - hostNode := discovery.Node{ - Name: host, - Host: host, - DiscoveryPort: gossipPort, - PeersPort: clusterPort, - RemotingPort: remotingPort, + Host: host, + DiscoveryPort: gossipPort, } // create the instance of provider - provider := NewDiscovery(config, &hostNode, WithLogger(log.DiscardLogger)) + provider := NewDiscovery(config, WithLogger(log.DiscardLogger)) require.NotNil(t, provider) // assert that provider implements the Discovery interface // this is a cheap test @@ -152,10 +135,8 @@ func TestDiscovery(t *testing.T) { srv := startNatsServer(t) // generate the ports for the single node - nodePorts := dynaport.Get(3) + nodePorts := dynaport.Get(1) gossipPort := nodePorts[0] - clusterPort := nodePorts[1] - remotingPort := nodePorts[2] // create a Cluster node host := "127.0.0.1" @@ -170,17 +151,12 @@ func TestDiscovery(t *testing.T) { ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", srv.Addr().String()), NatsSubject: natsSubject, + Host: host, + DiscoveryPort: gossipPort, } - hostNode := discovery.Node{ - Name: host, - Host: host, - DiscoveryPort: gossipPort, - PeersPort: clusterPort, - RemotingPort: remotingPort, - } // create the instance of provider - provider := NewDiscovery(config, &hostNode, WithLogger(log.DiscardLogger)) + provider := NewDiscovery(config, WithLogger(log.DiscardLogger)) require.NotNil(t, provider) assert.Equal(t, "nats", provider.ID()) }) @@ -189,10 +165,8 @@ func TestDiscovery(t *testing.T) { srv := startNatsServer(t) // generate the ports for the single node - nodePorts := dynaport.Get(3) + nodePorts := dynaport.Get(1) gossipPort := nodePorts[0] - clusterPort := nodePorts[1] - remotingPort := nodePorts[2] // create a Cluster node host := "127.0.0.1" @@ -209,18 +183,12 @@ func TestDiscovery(t *testing.T) { ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", natsServer), NatsSubject: natsSubject, - } - - hostNode := discovery.Node{ - Name: host, - Host: host, - DiscoveryPort: gossipPort, - PeersPort: clusterPort, - RemotingPort: remotingPort, + Host: host, + DiscoveryPort: gossipPort, } // create the instance of provider - provider := NewDiscovery(config, &hostNode, WithLogger(log.DiscardLogger)) + provider := NewDiscovery(config, WithLogger(log.DiscardLogger)) // initialize err := provider.Initialize() @@ -234,10 +202,8 @@ func TestDiscovery(t *testing.T) { srv := startNatsServer(t) // generate the ports for the single node - nodePorts := dynaport.Get(3) + nodePorts := dynaport.Get(1) gossipPort := nodePorts[0] - clusterPort := nodePorts[1] - remotingPort := nodePorts[2] // create a Cluster node host := "127.0.0.1" @@ -254,18 +220,12 @@ func TestDiscovery(t *testing.T) { ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", natsServer), NatsSubject: natsSubject, - } - - hostNode := discovery.Node{ - Name: host, - Host: host, - DiscoveryPort: gossipPort, - PeersPort: clusterPort, - RemotingPort: remotingPort, + Host: host, + DiscoveryPort: gossipPort, } // create the instance of provider - provider := NewDiscovery(config, &hostNode, WithLogger(log.DiscardLogger)) + provider := NewDiscovery(config, WithLogger(log.DiscardLogger)) provider.initialized = atomic.NewBool(true) assert.Error(t, provider.Initialize()) }) @@ -274,10 +234,8 @@ func TestDiscovery(t *testing.T) { srv := startNatsServer(t) // generate the ports for the single node - nodePorts := dynaport.Get(3) + nodePorts := dynaport.Get(1) gossipPort := nodePorts[0] - clusterPort := nodePorts[1] - remotingPort := nodePorts[2] // create a Cluster node host := "127.0.0.1" @@ -294,18 +252,12 @@ func TestDiscovery(t *testing.T) { ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", natsServer), NatsSubject: natsSubject, - } - - hostNode := discovery.Node{ - Name: host, - Host: host, - DiscoveryPort: gossipPort, - PeersPort: clusterPort, - RemotingPort: remotingPort, + Host: host, + DiscoveryPort: gossipPort, } // create the instance of provider - provider := NewDiscovery(config, &hostNode, WithLogger(log.DiscardLogger)) + provider := NewDiscovery(config, WithLogger(log.DiscardLogger)) provider.registered = atomic.NewBool(true) err := provider.Register() assert.Error(t, err) @@ -316,10 +268,8 @@ func TestDiscovery(t *testing.T) { srv := startNatsServer(t) // generate the ports for the single node - nodePorts := dynaport.Get(3) + nodePorts := dynaport.Get(1) gossipPort := nodePorts[0] - clusterPort := nodePorts[1] - remotingPort := nodePorts[2] // create a Cluster node host := "127.0.0.1" @@ -336,18 +286,12 @@ func TestDiscovery(t *testing.T) { ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", natsServer), NatsSubject: natsSubject, - } - - hostNode := discovery.Node{ - Name: host, - Host: host, - DiscoveryPort: gossipPort, - PeersPort: clusterPort, - RemotingPort: remotingPort, + Host: host, + DiscoveryPort: gossipPort, } // create the instance of provider - provider := NewDiscovery(config, &hostNode, WithLogger(log.DiscardLogger)) + provider := NewDiscovery(config, WithLogger(log.DiscardLogger)) err := provider.Deregister() assert.Error(t, err) assert.EqualError(t, err, discovery.ErrNotRegistered.Error()) @@ -382,7 +326,7 @@ func TestDiscovery(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, peers) require.Len(t, peers, 1) - discoveredNodeAddr := client2.hostNode.DiscoveryAddress() + discoveredNodeAddr := client2.address require.Equal(t, peers[0], discoveredNodeAddr) // discover more peers from client 2 @@ -390,7 +334,7 @@ func TestDiscovery(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, peers) require.Len(t, peers, 1) - discoveredNodeAddr = client1.hostNode.DiscoveryAddress() + discoveredNodeAddr = client1.address require.Equal(t, peers[0], discoveredNodeAddr) // de-register client 2 but it can see client1 @@ -398,7 +342,7 @@ func TestDiscovery(t *testing.T) { peers, err = client2.DiscoverPeers() require.NoError(t, err) require.NotEmpty(t, peers) - discoveredNodeAddr = client1.hostNode.DiscoveryAddress() + discoveredNodeAddr = client1.address require.Equal(t, peers[0], discoveredNodeAddr) // client-1 cannot see the deregistered client @@ -417,10 +361,8 @@ func TestDiscovery(t *testing.T) { srv := startNatsServer(t) // generate the ports for the single node - nodePorts := dynaport.Get(3) + nodePorts := dynaport.Get(1) gossipPort := nodePorts[0] - clusterPort := nodePorts[1] - remotingPort := nodePorts[2] // create a Cluster node host := "127.0.0.1" @@ -437,17 +379,11 @@ func TestDiscovery(t *testing.T) { ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", natsServer), NatsSubject: natsSubject, + Host: host, + DiscoveryPort: gossipPort, } - hostNode := discovery.Node{ - Name: host, - Host: host, - DiscoveryPort: gossipPort, - PeersPort: clusterPort, - RemotingPort: remotingPort, - } - - provider := NewDiscovery(config, &hostNode, WithLogger(log.DiscardLogger)) + provider := NewDiscovery(config, WithLogger(log.DiscardLogger)) peers, err := provider.DiscoverPeers() assert.Error(t, err) assert.Empty(t, peers) @@ -455,7 +391,7 @@ func TestDiscovery(t *testing.T) { }) t.Run("With Initialize: invalid config", func(t *testing.T) { config := &Config{} - provider := NewDiscovery(config, nil, WithLogger(log.DiscardLogger)) + provider := NewDiscovery(config, WithLogger(log.DiscardLogger)) // initialize err := provider.Initialize() diff --git a/internal/cluster/engine_test.go b/internal/cluster/engine_test.go index ce428106..b8a87fb2 100644 --- a/internal/cluster/engine_test.go +++ b/internal/cluster/engine_test.go @@ -451,6 +451,8 @@ func startEngine(t *testing.T, nodeName, serverAddr string) (*Engine, discovery. ActorSystemName: actorSystemName, NatsServer: fmt.Sprintf("nats://%s", serverAddr), NatsSubject: natsSubject, + Host: host, + DiscoveryPort: gossipPort, } hostNode := discovery.Node{ @@ -462,7 +464,7 @@ func startEngine(t *testing.T, nodeName, serverAddr string) (*Engine, discovery. } // create the instance of provider - provider := nats.NewDiscovery(&config, &hostNode) + provider := nats.NewDiscovery(&config) // create the startNode engine, err := NewEngine(nodeName, provider, &hostNode, WithLogger(log.DiscardLogger))