diff --git a/actors/actor_system.go b/actors/actor_system.go index de69822c..4e89420a 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -153,7 +153,7 @@ type actorSystem struct { internalpbconnect.UnimplementedClusterServiceHandler // map of actors in the system - actors *syncMap + actors *pidMap // states whether the actor system has started or not started atomic.Bool @@ -200,6 +200,7 @@ type actorSystem struct { clusterEventsChan <-chan *cluster.Event clusterSyncStopSig chan types.Unit partitionHasher hash.Hasher + clusterNode *discovery.Node // help protect some the fields to set locker sync.Mutex @@ -240,7 +241,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) { } system := &actorSystem{ - actors: newSyncMap(), + actors: newMap(), actorsChan: make(chan *internalpb.ActorRef, 10), name: name, logger: log.New(log.ErrorLevel, os.Stderr), @@ -521,7 +522,7 @@ func (x *actorSystem) PeerAddress() string { x.locker.Lock() defer x.locker.Unlock() if x.clusterEnabled.Load() { - return x.cluster.AdvertisedAddress() + return x.clusterNode.PeersAddress() } return "" } @@ -1064,7 +1065,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error { return errors.New("clustering needs remoting to be enabled") } - node := &discovery.Node{ + x.clusterNode = &discovery.Node{ Name: x.Name(), Host: x.host, DiscoveryPort: x.clusterConfig.DiscoveryPort(), @@ -1075,7 +1076,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error { clusterEngine, err := cluster.NewEngine( x.Name(), x.clusterConfig.Discovery(), - node, + x.clusterNode, cluster.WithLogger(x.logger), cluster.WithPartitionsCount(x.clusterConfig.PartitionCount()), cluster.WithHasher(x.partitionHasher), diff --git a/actors/pid.go b/actors/pid.go index 2f067c78..13718a91 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -119,10 +119,10 @@ type PID struct { watchersList *slice.Safe[*watcher] // hold the list of the children - children *syncMap + children *pidMap // hold the list of watched actors - watchedList *syncMap + watchedList *pidMap // the actor system system ActorSystem @@ -184,10 +184,10 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ... latestReceiveTime: atomic.Time{}, haltPassivationLnr: make(chan types.Unit, 1), logger: log.New(log.ErrorLevel, os.Stderr), - children: newSyncMap(), + children: newMap(), supervisorDirective: DefaultSupervisoryStrategy, watchersList: slice.NewSafe[*watcher](), - watchedList: newSyncMap(), + watchedList: newMap(), address: address, fieldsLocker: new(sync.RWMutex), stopLocker: new(sync.Mutex), @@ -1091,7 +1091,7 @@ func (pid *PID) watchers() *slice.Safe[*watcher] { } // watchees returns the list of actors watched by this actor -func (pid *PID) watchees() *syncMap { +func (pid *PID) watchees() *pidMap { return pid.watchedList } diff --git a/actors/sync_map.go b/actors/pid_map.go similarity index 87% rename from actors/sync_map.go rename to actors/pid_map.go index 3ef2162a..2695b850 100644 --- a/actors/sync_map.go +++ b/actors/pid_map.go @@ -31,24 +31,24 @@ import ( "github.com/tochemey/goakt/v2/address" ) -type syncMap struct { +type pidMap struct { sync.Map counter uint64 } -func newSyncMap() *syncMap { - return &syncMap{ +func newMap() *pidMap { + return &pidMap{ counter: 0, } } // Size returns the number of List -func (m *syncMap) Size() int { +func (m *pidMap) Size() int { return int(atomic.LoadUint64(&m.counter)) } // Get retrieves a pid by its address -func (m *syncMap) Get(address *address.Address) (pid *PID, ok bool) { +func (m *pidMap) Get(address *address.Address) (pid *PID, ok bool) { if val, found := m.Load(address.String()); found { return val.(*PID), found } @@ -56,7 +56,7 @@ func (m *syncMap) Get(address *address.Address) (pid *PID, ok bool) { } // Set sets a pid in the map -func (m *syncMap) Set(pid *PID) { +func (m *pidMap) Set(pid *PID) { if pid != nil { m.Store(pid.Address().String(), pid) atomic.AddUint64(&m.counter, 1) @@ -64,13 +64,13 @@ func (m *syncMap) Set(pid *PID) { } // Remove removes a pid from the map -func (m *syncMap) Remove(addr *address.Address) { +func (m *pidMap) Remove(addr *address.Address) { m.Delete(addr.String()) atomic.AddUint64(&m.counter, ^uint64(0)) } // List returns all actors as a slice -func (m *syncMap) List() []*PID { +func (m *pidMap) List() []*PID { var out []*PID m.Range(func(_, value interface{}) bool { out = append(out, value.(*PID)) @@ -81,7 +81,7 @@ func (m *syncMap) List() []*PID { // Reset resets the pids map // nolint -func (m *syncMap) Reset() { +func (m *pidMap) Reset() { // TODO: remove this line when migrated to go 1.23 //m.Clear() m.Range(func(key interface{}, value interface{}) bool { diff --git a/actors/sync_map_test.go b/actors/pid_map_test.go similarity index 98% rename from actors/sync_map_test.go rename to actors/pid_map_test.go index c9339cef..391c3247 100644 --- a/actors/sync_map_test.go +++ b/actors/pid_map_test.go @@ -41,7 +41,7 @@ func TestPIDMap(t *testing.T) { // create the PID actorRef := &PID{address: actorPath, fieldsLocker: &sync.RWMutex{}, stopLocker: &sync.Mutex{}} // create a new PID map - pidMap := newSyncMap() + pidMap := newMap() // add to the map pidMap.Set(actorRef) // assert the length of the map diff --git a/actors/scheduler.go b/actors/scheduler.go index 1d2a0dfa..5c0d1eab 100644 --- a/actors/scheduler.go +++ b/actors/scheduler.go @@ -343,14 +343,14 @@ func (x *scheduler) RemoteScheduleWithCron(ctx context.Context, message proto.Me func (x *scheduler) distributeJobKeyOrNot(ctx context.Context, job *quartz.JobDetail) error { jobKey := job.JobKey().String() if x.cluster != nil { - ok, err := x.cluster.KeyExists(ctx, jobKey) + ok, err := x.cluster.SchedulerJobKeyExists(ctx, jobKey) if err != nil { return err } if ok { return errSkipJobScheduling } - if err := x.cluster.SetKey(ctx, jobKey); err != nil { + if err := x.cluster.SetSchedulerJobKey(ctx, jobKey); err != nil { return err } } diff --git a/internal/cluster/engine.go b/internal/cluster/engine.go index e0805199..396d520a 100644 --- a/internal/cluster/engine.go +++ b/internal/cluster/engine.go @@ -27,6 +27,7 @@ package cluster import ( "context" "encoding/json" + "errors" "fmt" "net" "slices" @@ -38,7 +39,7 @@ import ( "github.com/buraksezer/olric/config" "github.com/buraksezer/olric/events" "github.com/buraksezer/olric/hasher" - "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" @@ -83,30 +84,23 @@ type Interface interface { Start(ctx context.Context) error // Stop stops the cluster engine Stop(ctx context.Context) error - // Host returns the cluster startNode host address - Host() string - // RemotingPort returns the cluster remoting port - RemotingPort() int // PutActor replicates onto the Node the metadata of an actor PutActor(ctx context.Context, actor *internalpb.ActorRef) error // GetActor fetches an actor from the Node GetActor(ctx context.Context, actorName string) (*internalpb.ActorRef, error) // GetPartition returns the partition where a given actor is stored GetPartition(actorName string) int - // SetKey sets a given key to the cluster - SetKey(ctx context.Context, key string) error - // KeyExists checks the existence of a given key - KeyExists(ctx context.Context, key string) (bool, error) - // UnsetKey unsets the already set given key in the cluster - UnsetKey(ctx context.Context, key string) error + // SetSchedulerJobKey sets a given key to the cluster + SetSchedulerJobKey(ctx context.Context, key string) error + // SchedulerJobKeyExists checks the existence of a given key + SchedulerJobKeyExists(ctx context.Context, key string) (bool, error) + // UnsetSchedulerJobKey unsets the already set given key in the cluster + UnsetSchedulerJobKey(ctx context.Context, key string) error // RemoveActor removes a given actor from the cluster. // An actor is removed from the cluster when this actor has been passivated. RemoveActor(ctx context.Context, actorName string) error // Events returns a channel where cluster events are published Events() <-chan *Event - // AdvertisedAddress returns the cluster node cluster address that is known by the - // peers in the cluster - AdvertisedAddress() string // Peers returns a channel containing the list of peers at a given time Peers(ctx context.Context) ([]*Peer, error) // GetState fetches a given peer state @@ -143,8 +137,8 @@ type Engine struct { // specifies the distributed key value store dmap olric.DMap - // specifies the Node host - host *discovery.Node + // specifies the Node node + node *discovery.Node // specifies the hasher hasher hash.Hasher @@ -193,8 +187,8 @@ func NewEngine(name string, disco discovery.Provider, host *discovery.Node, opts opt.Apply(engine) } - // set the host startNode - engine.host = host + // set the node startNode + engine.node = host return engine, nil } @@ -203,21 +197,21 @@ func NewEngine(name string, disco discovery.Provider, host *discovery.Node, opts func (n *Engine) Start(ctx context.Context) error { logger := n.logger - logger.Infof("Starting GoAkt cluster Engine service on host=(%s)....🤔", n.host.PeersAddress()) + logger.Infof("Starting GoAkt cluster Engine service on node=(%s)....🤔", n.node.PeersAddress()) conf := n.buildConfig() conf.Hasher = &hasherWrapper{n.hasher} m, err := config.NewMemberlistConfig("lan") if err != nil { - logger.Error(errors.Wrap(err, "failed to configure the cluster Engine members list.💥")) + logger.Errorf("failed to configure the cluster Engine members list.💥: %v", err) return err } // sets the bindings - m.BindAddr = n.host.Host - m.BindPort = n.host.DiscoveryPort - m.AdvertisePort = n.host.DiscoveryPort + m.BindAddr = n.node.Host + m.BindPort = n.node.DiscoveryPort + m.AdvertisePort = n.node.DiscoveryPort conf.MemberlistConfig = m // set the discovery provider @@ -240,7 +234,7 @@ func (n *Engine) Start(ctx context.Context) error { eng, err := olric.New(conf) if err != nil { - logger.Error(errors.Wrapf(err, "failed to start the cluster Engine on host=(%s)", n.name)) + logger.Error(fmt.Errorf("failed to start the cluster Engine on node=(%s): %w", n.name, err)) return err } @@ -252,7 +246,7 @@ func (n *Engine) Start(ctx context.Context) error { logger.Panic(e) } // the expectation is to exit the application - logger.Fatal(errors.Wrapf(err, "failed to start the cluster Engine on host=(%s)", n.name)) + logger.Error(fmt.Errorf("failed to start the cluster Engine on node=(%s): %w", n.name, err)) } }() @@ -261,26 +255,26 @@ func (n *Engine) Start(ctx context.Context) error { // set the client n.client = n.server.NewEmbeddedClient() - dmp, err := n.client.NewDMap(n.name) + newDMap, err := n.client.NewDMap(n.name) if err != nil { - logger.Error(errors.Wrapf(err, "failed to start the cluster Engine on host=(%s)", n.name)) + logger.Error(fmt.Errorf("failed to start the cluster Engine on node=(%s): %w", n.name, err)) return n.server.Shutdown(ctx) } - n.dmap = dmp + n.dmap = newDMap // create a subscriber to consume to cluster events - ps, err := n.client.NewPubSub(olric.ToAddress(n.host.PeersAddress())) + ps, err := n.client.NewPubSub(olric.ToAddress(n.node.PeersAddress())) if err != nil { - logger.Error(errors.Wrapf(err, "failed to start the cluster Engine on host=(%s)", n.name)) + logger.Error(fmt.Errorf("failed to start the cluster Engine on node=(%s): %w", n.name, err)) return n.server.Shutdown(ctx) } // set the peer state n.peerState = &internalpb.PeerState{ - Host: n.Host(), - RemotingPort: int32(n.host.RemotingPort), - PeersPort: int32(n.host.PeersPort), + Host: n.node.Host, + RemotingPort: int32(n.node.RemotingPort), + PeersPort: int32(n.node.PeersPort), Actors: []*internalpb.ActorRef{}, } @@ -306,19 +300,19 @@ func (n *Engine) Stop(ctx context.Context) error { // close the events listener if err := n.pubSub.Close(); err != nil { - logger.Error(errors.Wrap(err, "failed to shutdown the cluster events listener")) + logger.Errorf("failed to stop the cluster Engine on node=(%s): %w", n.node.PeersAddress(), err) return err } // close the Node client if err := n.client.Close(ctx); err != nil { - logger.Error(errors.Wrapf(err, "failed to shutdown the cluster engine=(%s)", n.name)) + logger.Errorf("failed to stop the cluster Engine on node=(%s): %w", n.node.PeersAddress(), err) return err } // let us stop the server if err := n.server.Shutdown(ctx); err != nil { - logger.Error(errors.Wrapf(err, "failed to shutdown the cluster engine=(%s)", n.name)) + logger.Errorf("failed to stop the cluster Engine on node=(%s): %w", n.node.PeersAddress(), err) return err } @@ -337,42 +331,17 @@ func (n *Engine) Stop(ctx context.Context) error { func (n *Engine) IsLeader(ctx context.Context) bool { n.Lock() client := n.client - host := n.host + host := n.node n.Unlock() stats, err := client.Stats(ctx, host.PeersAddress()) if err != nil { - n.logger.Errorf("failed to fetch the cluster node=(%s) stats: %v", n.host.PeersAddress(), err) + n.logger.Errorf("failed to fetch the cluster node=(%s) stats: %v", n.node.PeersAddress(), err) return false } return stats.ClusterCoordinator.String() == stats.Member.String() } -// Host returns the Node Host -func (n *Engine) Host() string { - n.Lock() - host := n.host - n.Unlock() - return host.Host -} - -// RemotingPort returns the Node remoting port -func (n *Engine) RemotingPort() int { - n.Lock() - host := n.host - n.Unlock() - return host.RemotingPort -} - -// AdvertisedAddress returns the cluster node cluster address that is known by the -// peers in the cluster -func (n *Engine) AdvertisedAddress() string { - n.Lock() - host := n.host - n.Unlock() - return host.PeersAddress() -} - // PutActor pushes to the cluster the peer sync request func (n *Engine) PutActor(ctx context.Context, actor *internalpb.ActorRef) error { ctx, cancelFn := context.WithTimeout(ctx, n.writeTimeout) @@ -383,7 +352,7 @@ func (n *Engine) PutActor(ctx context.Context, actor *internalpb.ActorRef) error logger := n.logger - logger.Infof("synchronization peer (%s)", n.host.PeersAddress()) + logger.Infof("synchronization peer (%s)", n.node.PeersAddress()) eg, ctx := errgroup.WithContext(ctx) eg.SetLimit(2) @@ -391,7 +360,7 @@ func (n *Engine) PutActor(ctx context.Context, actor *internalpb.ActorRef) error eg.Go(func() error { encoded, _ := encode(actor) if err := n.dmap.Put(ctx, actor.GetActorAddress().GetName(), encoded); err != nil { - return fmt.Errorf("failed to sync actor=(%s) of peer=(%s): %v", actor.GetActorAddress().GetName(), n.host.PeersAddress(), err) + return fmt.Errorf("failed to sync actor=(%s) of peer=(%s): %v", actor.GetActorAddress().GetName(), n.node.PeersAddress(), err) } return nil @@ -407,19 +376,19 @@ func (n *Engine) PutActor(ctx context.Context, actor *internalpb.ActorRef) error n.peerState.Actors = compacted encoded, _ := proto.Marshal(n.peerState) - if err := n.dmap.Put(ctx, n.host.PeersAddress(), encoded); err != nil { - return fmt.Errorf("failed to sync peer=(%s) request: %v", n.host.PeersAddress(), err) + if err := n.dmap.Put(ctx, n.node.PeersAddress(), encoded); err != nil { + return fmt.Errorf("failed to sync peer=(%s) request: %v", n.node.PeersAddress(), err) } return nil }) if err := eg.Wait(); err != nil { - logger.Errorf("failed to synchronize peer=(%s): %v", n.host.PeersAddress(), err) + logger.Errorf("failed to synchronize peer=(%s): %v", n.node.PeersAddress(), err) return err } - logger.Infof("peer (%s) successfully synchronized in the cluster", n.host.PeersAddress()) + logger.Infof("peer (%s) successfully synchronized in the cluster", n.node.PeersAddress()) return nil } @@ -433,31 +402,31 @@ func (n *Engine) GetState(ctx context.Context, peerAddress string) (*internalpb. logger := n.logger - logger.Infof("[%s] retrieving peer (%s) sync record", n.host.PeersAddress(), peerAddress) + logger.Infof("[%s] retrieving peer (%s) sync record", n.node.PeersAddress(), peerAddress) resp, err := n.dmap.Get(ctx, peerAddress) if err != nil { if errors.Is(err, olric.ErrKeyNotFound) { - logger.Warnf("[%s] has not found peer=(%s) sync record", n.host.PeersAddress(), peerAddress) + logger.Warnf("[%s] has not found peer=(%s) sync record", n.node.PeersAddress(), peerAddress) return nil, ErrPeerSyncNotFound } - logger.Errorf("[%s] failed to find peer=(%s) sync record: %v", n.host.PeersAddress(), peerAddress, err) + logger.Errorf("[%s] failed to find peer=(%s) sync record: %v", n.node.PeersAddress(), peerAddress, err) return nil, err } bytea, err := resp.Byte() if err != nil { - logger.Errorf("[%s] failed to read peer=(%s) sync record: %v", n.host.PeersAddress(), peerAddress, err) + logger.Errorf("[%s] failed to read peer=(%s) sync record: %v", n.node.PeersAddress(), peerAddress, err) return nil, err } peerState := new(internalpb.PeerState) if err := proto.Unmarshal(bytea, peerState); err != nil { - logger.Errorf("[%s] failed to decode peer=(%s) sync record: %v", n.host.PeersAddress(), peerAddress, err) + logger.Errorf("[%s] failed to decode peer=(%s) sync record: %v", n.node.PeersAddress(), peerAddress, err) return nil, err } - logger.Infof("[%s] successfully retrieved peer (%s) sync record .🎉", n.host.PeersAddress(), peerAddress) + logger.Infof("[%s] successfully retrieved peer (%s) sync record .🎉", n.node.PeersAddress(), peerAddress) return peerState, nil } @@ -471,31 +440,31 @@ func (n *Engine) GetActor(ctx context.Context, actorName string) (*internalpb.Ac logger := n.logger - logger.Infof("[%s] retrieving actor (%s) from the cluster", n.host.PeersAddress(), actorName) + logger.Infof("[%s] retrieving actor (%s) from the cluster", n.node.PeersAddress(), actorName) resp, err := n.dmap.Get(ctx, actorName) if err != nil { if errors.Is(err, olric.ErrKeyNotFound) { - logger.Warnf("[%s] could not find actor=%s the cluster", n.host.PeersAddress(), actorName) + logger.Warnf("[%s] could not find actor=%s the cluster", n.node.PeersAddress(), actorName) return nil, ErrActorNotFound } - logger.Error(errors.Wrapf(err, "[%s] failed to get actor=%s record", n.host.PeersAddress(), actorName)) + logger.Errorf("[%s] failed to get actor=(%s) record from the cluster: %v", n.node.PeersAddress(), actorName, err) return nil, err } bytea, err := resp.Byte() if err != nil { - logger.Error(errors.Wrapf(err, "[%s] failed to read the record at:{%s}", n.host.PeersAddress(), actorName)) + logger.Errorf("[%s] failed to read actor=(%s) record: %v", n.node.PeersAddress(), actorName, err) return nil, err } actor, err := decode(bytea) if err != nil { - logger.Error(errors.Wrapf(err, "[%s] failed to decode actor=%s record", n.host.PeersAddress(), actorName)) + logger.Errorf("[%s] failed to decode actor=(%s) record: %v", n.node.PeersAddress(), actorName, err) return nil, err } - logger.Infof("[%s] successfully retrieved from the cluster actor (%s)", n.host.PeersAddress(), actor.GetActorAddress().GetName()) + logger.Infof("[%s] successfully retrieved from the cluster actor (%s)", n.node.PeersAddress(), actor.GetActorAddress().GetName()) return actor, nil } @@ -510,7 +479,7 @@ func (n *Engine) RemoveActor(ctx context.Context, actorName string) error { _, err := n.dmap.Delete(ctx, actorName) if err != nil { - logger.Error(errors.Wrapf(err, "failed to remove actor=%s record from the cluster", actorName)) + logger.Errorf("[%s] failed to remove actor=(%s) record from cluster: %v", n.node.PeersAddress(), actorName, err) return err } @@ -518,8 +487,8 @@ func (n *Engine) RemoveActor(ctx context.Context, actorName string) error { return nil } -// SetKey sets a given key to the cluster -func (n *Engine) SetKey(ctx context.Context, key string) error { +// SetSchedulerJobKey sets a given key to the cluster +func (n *Engine) SetSchedulerJobKey(ctx context.Context, key string) error { ctx, cancelFn := context.WithTimeout(ctx, n.writeTimeout) defer cancelFn() @@ -531,7 +500,7 @@ func (n *Engine) SetKey(ctx context.Context, key string) error { logger.Infof("replicating key (%s)", key) if err := n.dmap.Put(ctx, key, true); err != nil { - logger.Error(errors.Wrapf(err, "failed to replicate key=%s record.💥", key)) + logger.Errorf("failed to replicate scheduler job key (%s): %v", key, err) return err } @@ -539,8 +508,8 @@ func (n *Engine) SetKey(ctx context.Context, key string) error { return nil } -// KeyExists checks the existence of a given key -func (n *Engine) KeyExists(ctx context.Context, key string) (bool, error) { +// SchedulerJobKeyExists checks the existence of a given key +func (n *Engine) SchedulerJobKeyExists(ctx context.Context, key string) (bool, error) { ctx, cancelFn := context.WithTimeout(ctx, n.readTimeout) defer cancelFn() @@ -558,14 +527,14 @@ func (n *Engine) KeyExists(ctx context.Context, key string) (bool, error) { return false, nil } - logger.Error(errors.Wrapf(err, "failed to check key=%s existence", key)) + logger.Errorf("[%s] failed to check scheduler job key (%s) existence: %v", n.node.PeersAddress(), key, err) return false, err } return resp.Bool() } -// UnsetKey unsets the already set given key in the cluster -func (n *Engine) UnsetKey(ctx context.Context, key string) error { +// UnsetSchedulerJobKey unsets the already set given key in the cluster +func (n *Engine) UnsetSchedulerJobKey(ctx context.Context, key string) error { logger := n.logger n.Lock() @@ -574,7 +543,7 @@ func (n *Engine) UnsetKey(ctx context.Context, key string) error { logger.Infof("unsetting key (%s)", key) if _, err := n.dmap.Delete(ctx, key); err != nil { - logger.Error(errors.Wrapf(err, "failed to unset key=%s record.💥", key)) + logger.Errorf("failed to unset scheduler job key (%s): %v", key, err) return err } @@ -604,14 +573,14 @@ func (n *Engine) Peers(ctx context.Context) ([]*Peer, error) { members, err := client.Members(ctx) if err != nil { - n.logger.Error(errors.Wrap(err, "failed to read cluster peers")) + n.logger.Errorf("failed to read cluster peers: %v", err) return nil, err } peers := make([]*Peer, 0, len(members)) for _, member := range members { - if member.Name != n.AdvertisedAddress() { - n.logger.Debugf("node=(%s) has found peer=(%s)", n.AdvertisedAddress(), member.Name) + if member.Name != n.node.PeersAddress() { + n.logger.Debugf("node=(%s) has found peer=(%s)", n.node.PeersAddress(), member.Name) peerHost, port, _ := net.SplitHostPort(member.Name) peerPort, _ := strconv.Atoi(port) peers = append(peers, &Peer{Host: peerHost, Port: peerPort, Coordinator: member.Coordinator}) @@ -634,7 +603,7 @@ func (n *Engine) consume() { payload := message.Payload var event map[string]any if err := json.Unmarshal([]byte(payload), &event); err != nil { - n.logger.Error(errors.Wrap(err, "failed to decode cluster event")) + n.logger.Errorf("failed to unmarshal cluster event: %v", err) // TODO: should we continue or not continue } @@ -644,12 +613,12 @@ func (n *Engine) consume() { case events.KindNodeJoinEvent: nodeJoined := new(events.NodeJoinEvent) if err := json.Unmarshal([]byte(payload), &nodeJoined); err != nil { - n.logger.Error(errors.Wrap(err, "failed to decode NodeJoined cluster event")) + n.logger.Errorf("failed to unmarshal node join cluster event: %v", err) // TODO: should we continue or not continue } - if n.AdvertisedAddress() == nodeJoined.NodeJoin { + if n.node.PeersAddress() == nodeJoined.NodeJoin { n.logger.Debug("skipping self") continue } @@ -669,7 +638,7 @@ func (n *Engine) consume() { case events.KindNodeLeftEvent: nodeLeft := new(events.NodeLeftEvent) if err := json.Unmarshal([]byte(payload), &nodeLeft); err != nil { - n.logger.Error(errors.Wrap(err, "failed to decode NodeLeft cluster event")) + n.logger.Errorf("failed to unmarshal node left cluster event: %v", err) // TODO: should we continue or not continue } @@ -710,8 +679,8 @@ func (n *Engine) buildConfig() *config.Config { // create the config and return it conf := &config.Config{ - BindAddr: n.host.Host, - BindPort: n.host.PeersPort, + BindAddr: n.node.Host, + BindPort: n.node.PeersPort, ReadRepair: true, ReplicaCount: int(n.replicaCount), WriteQuorum: config.DefaultWriteQuorum, diff --git a/internal/cluster/engine_test.go b/internal/cluster/engine_test.go index b8a87fb2..e9614cc5 100644 --- a/internal/cluster/engine_test.go +++ b/internal/cluster/engine_test.go @@ -94,7 +94,7 @@ func TestSingleNode(t *testing.T) { err = cluster.Start(ctx) require.NoError(t, err) - hostNodeAddr := cluster.Host() + hostNodeAddr := cluster.node.Host assert.Equal(t, host, hostNodeAddr) // shutdown the Node startNode @@ -220,18 +220,18 @@ func TestSingleNode(t *testing.T) { // set key key := "my-key" - require.NoError(t, cluster.SetKey(ctx, key)) + require.NoError(t, cluster.SetSchedulerJobKey(ctx, key)) - isSet, err := cluster.KeyExists(ctx, key) + isSet, err := cluster.SchedulerJobKeyExists(ctx, key) require.NoError(t, err) assert.True(t, isSet) // unset the key - err = cluster.UnsetKey(ctx, key) + err = cluster.UnsetSchedulerJobKey(ctx, key) require.NoError(t, err) // check the key existence - isSet, err = cluster.KeyExists(ctx, key) + isSet, err = cluster.SchedulerJobKeyExists(ctx, key) require.NoError(t, err) assert.False(t, isSet) @@ -328,7 +328,7 @@ func TestMultipleNodes(t *testing.T) { // create a cluster node1 node2, sd2 := startEngine(t, "node2", srv.Addr().String()) require.NotNil(t, node2) - node2Addr := node2.AdvertisedAddress() + node2Addr := node2.node.PeersAddress() // wait for the node to start properly lib.Pause(time.Second) diff --git a/mocks/cluster/interface.go b/mocks/cluster/interface.go index 9c25f55b..2602ee0d 100644 --- a/mocks/cluster/interface.go +++ b/mocks/cluster/interface.go @@ -24,51 +24,6 @@ func (_m *Interface) EXPECT() *Interface_Expecter { return &Interface_Expecter{mock: &_m.Mock} } -// AdvertisedAddress provides a mock function with given fields: -func (_m *Interface) AdvertisedAddress() string { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for AdvertisedAddress") - } - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// Interface_AdvertisedAddress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AdvertisedAddress' -type Interface_AdvertisedAddress_Call struct { - *mock.Call -} - -// AdvertisedAddress is a helper method to define mock.On call -func (_e *Interface_Expecter) AdvertisedAddress() *Interface_AdvertisedAddress_Call { - return &Interface_AdvertisedAddress_Call{Call: _e.mock.On("AdvertisedAddress")} -} - -func (_c *Interface_AdvertisedAddress_Call) Run(run func()) *Interface_AdvertisedAddress_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Interface_AdvertisedAddress_Call) Return(_a0 string) *Interface_AdvertisedAddress_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Interface_AdvertisedAddress_Call) RunAndReturn(run func() string) *Interface_AdvertisedAddress_Call { - _c.Call.Return(run) - return _c -} - // Events provides a mock function with given fields: func (_m *Interface) Events() <-chan *internalcluster.Event { ret := _m.Called() @@ -280,51 +235,6 @@ func (_c *Interface_GetState_Call) RunAndReturn(run func(context.Context, string return _c } -// Host provides a mock function with given fields: -func (_m *Interface) Host() string { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Host") - } - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// Interface_Host_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Host' -type Interface_Host_Call struct { - *mock.Call -} - -// Host is a helper method to define mock.On call -func (_e *Interface_Expecter) Host() *Interface_Host_Call { - return &Interface_Host_Call{Call: _e.mock.On("Host")} -} - -func (_c *Interface_Host_Call) Run(run func()) *Interface_Host_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Interface_Host_Call) Return(_a0 string) *Interface_Host_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Interface_Host_Call) RunAndReturn(run func() string) *Interface_Host_Call { - _c.Call.Return(run) - return _c -} - // IsLeader provides a mock function with given fields: ctx func (_m *Interface) IsLeader(ctx context.Context) bool { ret := _m.Called(ctx) @@ -371,63 +281,6 @@ func (_c *Interface_IsLeader_Call) RunAndReturn(run func(context.Context) bool) return _c } -// KeyExists provides a mock function with given fields: ctx, key -func (_m *Interface) KeyExists(ctx context.Context, key string) (bool, error) { - ret := _m.Called(ctx, key) - - if len(ret) == 0 { - panic("no return value specified for KeyExists") - } - - var r0 bool - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok { - return rf(ctx, key) - } - if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { - r0 = rf(ctx, key) - } else { - r0 = ret.Get(0).(bool) - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, key) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Interface_KeyExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'KeyExists' -type Interface_KeyExists_Call struct { - *mock.Call -} - -// KeyExists is a helper method to define mock.On call -// - ctx context.Context -// - key string -func (_e *Interface_Expecter) KeyExists(ctx interface{}, key interface{}) *Interface_KeyExists_Call { - return &Interface_KeyExists_Call{Call: _e.mock.On("KeyExists", ctx, key)} -} - -func (_c *Interface_KeyExists_Call) Run(run func(ctx context.Context, key string)) *Interface_KeyExists_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *Interface_KeyExists_Call) Return(_a0 bool, _a1 error) *Interface_KeyExists_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *Interface_KeyExists_Call) RunAndReturn(run func(context.Context, string) (bool, error)) *Interface_KeyExists_Call { - _c.Call.Return(run) - return _c -} - // Peers provides a mock function with given fields: ctx func (_m *Interface) Peers(ctx context.Context) ([]*internalcluster.Peer, error) { ret := _m.Called(ctx) @@ -533,104 +386,116 @@ func (_c *Interface_PutActor_Call) RunAndReturn(run func(context.Context, *inter return _c } -// RemotingPort provides a mock function with given fields: -func (_m *Interface) RemotingPort() int { - ret := _m.Called() +// RemoveActor provides a mock function with given fields: ctx, actorName +func (_m *Interface) RemoveActor(ctx context.Context, actorName string) error { + ret := _m.Called(ctx, actorName) if len(ret) == 0 { - panic("no return value specified for RemotingPort") + panic("no return value specified for RemoveActor") } - var r0 int - if rf, ok := ret.Get(0).(func() int); ok { - r0 = rf() + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, actorName) } else { - r0 = ret.Get(0).(int) + r0 = ret.Error(0) } return r0 } -// Interface_RemotingPort_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemotingPort' -type Interface_RemotingPort_Call struct { +// Interface_RemoveActor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveActor' +type Interface_RemoveActor_Call struct { *mock.Call } -// RemotingPort is a helper method to define mock.On call -func (_e *Interface_Expecter) RemotingPort() *Interface_RemotingPort_Call { - return &Interface_RemotingPort_Call{Call: _e.mock.On("RemotingPort")} +// RemoveActor is a helper method to define mock.On call +// - ctx context.Context +// - actorName string +func (_e *Interface_Expecter) RemoveActor(ctx interface{}, actorName interface{}) *Interface_RemoveActor_Call { + return &Interface_RemoveActor_Call{Call: _e.mock.On("RemoveActor", ctx, actorName)} } -func (_c *Interface_RemotingPort_Call) Run(run func()) *Interface_RemotingPort_Call { +func (_c *Interface_RemoveActor_Call) Run(run func(ctx context.Context, actorName string)) *Interface_RemoveActor_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].(context.Context), args[1].(string)) }) return _c } -func (_c *Interface_RemotingPort_Call) Return(_a0 int) *Interface_RemotingPort_Call { +func (_c *Interface_RemoveActor_Call) Return(_a0 error) *Interface_RemoveActor_Call { _c.Call.Return(_a0) return _c } -func (_c *Interface_RemotingPort_Call) RunAndReturn(run func() int) *Interface_RemotingPort_Call { +func (_c *Interface_RemoveActor_Call) RunAndReturn(run func(context.Context, string) error) *Interface_RemoveActor_Call { _c.Call.Return(run) return _c } -// RemoveActor provides a mock function with given fields: ctx, actorName -func (_m *Interface) RemoveActor(ctx context.Context, actorName string) error { - ret := _m.Called(ctx, actorName) +// SchedulerJobKeyExists provides a mock function with given fields: ctx, key +func (_m *Interface) SchedulerJobKeyExists(ctx context.Context, key string) (bool, error) { + ret := _m.Called(ctx, key) if len(ret) == 0 { - panic("no return value specified for RemoveActor") + panic("no return value specified for SchedulerJobKeyExists") } - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, actorName) + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok { + return rf(ctx, key) + } + if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { + r0 = rf(ctx, key) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(bool) } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, key) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } -// Interface_RemoveActor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveActor' -type Interface_RemoveActor_Call struct { +// Interface_SchedulerJobKeyExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SchedulerJobKeyExists' +type Interface_SchedulerJobKeyExists_Call struct { *mock.Call } -// RemoveActor is a helper method to define mock.On call +// SchedulerJobKeyExists is a helper method to define mock.On call // - ctx context.Context -// - actorName string -func (_e *Interface_Expecter) RemoveActor(ctx interface{}, actorName interface{}) *Interface_RemoveActor_Call { - return &Interface_RemoveActor_Call{Call: _e.mock.On("RemoveActor", ctx, actorName)} +// - key string +func (_e *Interface_Expecter) SchedulerJobKeyExists(ctx interface{}, key interface{}) *Interface_SchedulerJobKeyExists_Call { + return &Interface_SchedulerJobKeyExists_Call{Call: _e.mock.On("SchedulerJobKeyExists", ctx, key)} } -func (_c *Interface_RemoveActor_Call) Run(run func(ctx context.Context, actorName string)) *Interface_RemoveActor_Call { +func (_c *Interface_SchedulerJobKeyExists_Call) Run(run func(ctx context.Context, key string)) *Interface_SchedulerJobKeyExists_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(string)) }) return _c } -func (_c *Interface_RemoveActor_Call) Return(_a0 error) *Interface_RemoveActor_Call { - _c.Call.Return(_a0) +func (_c *Interface_SchedulerJobKeyExists_Call) Return(_a0 bool, _a1 error) *Interface_SchedulerJobKeyExists_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *Interface_RemoveActor_Call) RunAndReturn(run func(context.Context, string) error) *Interface_RemoveActor_Call { +func (_c *Interface_SchedulerJobKeyExists_Call) RunAndReturn(run func(context.Context, string) (bool, error)) *Interface_SchedulerJobKeyExists_Call { _c.Call.Return(run) return _c } -// SetKey provides a mock function with given fields: ctx, key -func (_m *Interface) SetKey(ctx context.Context, key string) error { +// SetSchedulerJobKey provides a mock function with given fields: ctx, key +func (_m *Interface) SetSchedulerJobKey(ctx context.Context, key string) error { ret := _m.Called(ctx, key) if len(ret) == 0 { - panic("no return value specified for SetKey") + panic("no return value specified for SetSchedulerJobKey") } var r0 error @@ -643,31 +508,31 @@ func (_m *Interface) SetKey(ctx context.Context, key string) error { return r0 } -// Interface_SetKey_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetKey' -type Interface_SetKey_Call struct { +// Interface_SetSchedulerJobKey_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSchedulerJobKey' +type Interface_SetSchedulerJobKey_Call struct { *mock.Call } -// SetKey is a helper method to define mock.On call +// SetSchedulerJobKey is a helper method to define mock.On call // - ctx context.Context // - key string -func (_e *Interface_Expecter) SetKey(ctx interface{}, key interface{}) *Interface_SetKey_Call { - return &Interface_SetKey_Call{Call: _e.mock.On("SetKey", ctx, key)} +func (_e *Interface_Expecter) SetSchedulerJobKey(ctx interface{}, key interface{}) *Interface_SetSchedulerJobKey_Call { + return &Interface_SetSchedulerJobKey_Call{Call: _e.mock.On("SetSchedulerJobKey", ctx, key)} } -func (_c *Interface_SetKey_Call) Run(run func(ctx context.Context, key string)) *Interface_SetKey_Call { +func (_c *Interface_SetSchedulerJobKey_Call) Run(run func(ctx context.Context, key string)) *Interface_SetSchedulerJobKey_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(string)) }) return _c } -func (_c *Interface_SetKey_Call) Return(_a0 error) *Interface_SetKey_Call { +func (_c *Interface_SetSchedulerJobKey_Call) Return(_a0 error) *Interface_SetSchedulerJobKey_Call { _c.Call.Return(_a0) return _c } -func (_c *Interface_SetKey_Call) RunAndReturn(run func(context.Context, string) error) *Interface_SetKey_Call { +func (_c *Interface_SetSchedulerJobKey_Call) RunAndReturn(run func(context.Context, string) error) *Interface_SetSchedulerJobKey_Call { _c.Call.Return(run) return _c } @@ -764,12 +629,12 @@ func (_c *Interface_Stop_Call) RunAndReturn(run func(context.Context) error) *In return _c } -// UnsetKey provides a mock function with given fields: ctx, key -func (_m *Interface) UnsetKey(ctx context.Context, key string) error { +// UnsetSchedulerJobKey provides a mock function with given fields: ctx, key +func (_m *Interface) UnsetSchedulerJobKey(ctx context.Context, key string) error { ret := _m.Called(ctx, key) if len(ret) == 0 { - panic("no return value specified for UnsetKey") + panic("no return value specified for UnsetSchedulerJobKey") } var r0 error @@ -782,31 +647,31 @@ func (_m *Interface) UnsetKey(ctx context.Context, key string) error { return r0 } -// Interface_UnsetKey_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UnsetKey' -type Interface_UnsetKey_Call struct { +// Interface_UnsetSchedulerJobKey_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UnsetSchedulerJobKey' +type Interface_UnsetSchedulerJobKey_Call struct { *mock.Call } -// UnsetKey is a helper method to define mock.On call +// UnsetSchedulerJobKey is a helper method to define mock.On call // - ctx context.Context // - key string -func (_e *Interface_Expecter) UnsetKey(ctx interface{}, key interface{}) *Interface_UnsetKey_Call { - return &Interface_UnsetKey_Call{Call: _e.mock.On("UnsetKey", ctx, key)} +func (_e *Interface_Expecter) UnsetSchedulerJobKey(ctx interface{}, key interface{}) *Interface_UnsetSchedulerJobKey_Call { + return &Interface_UnsetSchedulerJobKey_Call{Call: _e.mock.On("UnsetSchedulerJobKey", ctx, key)} } -func (_c *Interface_UnsetKey_Call) Run(run func(ctx context.Context, key string)) *Interface_UnsetKey_Call { +func (_c *Interface_UnsetSchedulerJobKey_Call) Run(run func(ctx context.Context, key string)) *Interface_UnsetSchedulerJobKey_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(string)) }) return _c } -func (_c *Interface_UnsetKey_Call) Return(_a0 error) *Interface_UnsetKey_Call { +func (_c *Interface_UnsetSchedulerJobKey_Call) Return(_a0 error) *Interface_UnsetSchedulerJobKey_Call { _c.Call.Return(_a0) return _c } -func (_c *Interface_UnsetKey_Call) RunAndReturn(run func(context.Context, string) error) *Interface_UnsetKey_Call { +func (_c *Interface_UnsetSchedulerJobKey_Call) RunAndReturn(run func(context.Context, string) error) *Interface_UnsetSchedulerJobKey_Call { _c.Call.Return(run) return _c }