Skip to content

Commit

Permalink
refactor: code maintenance (#521)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Nov 17, 2024
1 parent 459b429 commit a0383fe
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 333 deletions.
11 changes: 6 additions & 5 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type actorSystem struct {
internalpbconnect.UnimplementedClusterServiceHandler

// map of actors in the system
actors *syncMap
actors *pidMap

// states whether the actor system has started or not
started atomic.Bool
Expand Down Expand Up @@ -200,6 +200,7 @@ type actorSystem struct {
clusterEventsChan <-chan *cluster.Event
clusterSyncStopSig chan types.Unit
partitionHasher hash.Hasher
clusterNode *discovery.Node

// help protect some the fields to set
locker sync.Mutex
Expand Down Expand Up @@ -240,7 +241,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
}

system := &actorSystem{
actors: newSyncMap(),
actors: newMap(),
actorsChan: make(chan *internalpb.ActorRef, 10),
name: name,
logger: log.New(log.ErrorLevel, os.Stderr),
Expand Down Expand Up @@ -521,7 +522,7 @@ func (x *actorSystem) PeerAddress() string {
x.locker.Lock()
defer x.locker.Unlock()
if x.clusterEnabled.Load() {
return x.cluster.AdvertisedAddress()
return x.clusterNode.PeersAddress()
}
return ""
}
Expand Down Expand Up @@ -1064,7 +1065,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
return errors.New("clustering needs remoting to be enabled")
}

node := &discovery.Node{
x.clusterNode = &discovery.Node{
Name: x.Name(),
Host: x.host,
DiscoveryPort: x.clusterConfig.DiscoveryPort(),
Expand All @@ -1075,7 +1076,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
clusterEngine, err := cluster.NewEngine(
x.Name(),
x.clusterConfig.Discovery(),
node,
x.clusterNode,
cluster.WithLogger(x.logger),
cluster.WithPartitionsCount(x.clusterConfig.PartitionCount()),
cluster.WithHasher(x.partitionHasher),
Expand Down
10 changes: 5 additions & 5 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ type PID struct {
watchersList *slice.Safe[*watcher]

// hold the list of the children
children *syncMap
children *pidMap

// hold the list of watched actors
watchedList *syncMap
watchedList *pidMap

// the actor system
system ActorSystem
Expand Down Expand Up @@ -184,10 +184,10 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
latestReceiveTime: atomic.Time{},
haltPassivationLnr: make(chan types.Unit, 1),
logger: log.New(log.ErrorLevel, os.Stderr),
children: newSyncMap(),
children: newMap(),
supervisorDirective: DefaultSupervisoryStrategy,
watchersList: slice.NewSafe[*watcher](),
watchedList: newSyncMap(),
watchedList: newMap(),
address: address,
fieldsLocker: new(sync.RWMutex),
stopLocker: new(sync.Mutex),
Expand Down Expand Up @@ -1091,7 +1091,7 @@ func (pid *PID) watchers() *slice.Safe[*watcher] {
}

// watchees returns the list of actors watched by this actor
func (pid *PID) watchees() *syncMap {
func (pid *PID) watchees() *pidMap {
return pid.watchedList
}

Expand Down
18 changes: 9 additions & 9 deletions actors/sync_map.go → actors/pid_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,46 +31,46 @@ import (
"github.com/tochemey/goakt/v2/address"
)

type syncMap struct {
type pidMap struct {
sync.Map
counter uint64
}

func newSyncMap() *syncMap {
return &syncMap{
func newMap() *pidMap {
return &pidMap{
counter: 0,
}
}

// Size returns the number of List
func (m *syncMap) Size() int {
func (m *pidMap) Size() int {
return int(atomic.LoadUint64(&m.counter))
}

// Get retrieves a pid by its address
func (m *syncMap) Get(address *address.Address) (pid *PID, ok bool) {
func (m *pidMap) Get(address *address.Address) (pid *PID, ok bool) {
if val, found := m.Load(address.String()); found {
return val.(*PID), found
}
return
}

// Set sets a pid in the map
func (m *syncMap) Set(pid *PID) {
func (m *pidMap) Set(pid *PID) {
if pid != nil {
m.Store(pid.Address().String(), pid)
atomic.AddUint64(&m.counter, 1)
}
}

// Remove removes a pid from the map
func (m *syncMap) Remove(addr *address.Address) {
func (m *pidMap) Remove(addr *address.Address) {
m.Delete(addr.String())
atomic.AddUint64(&m.counter, ^uint64(0))
}

// List returns all actors as a slice
func (m *syncMap) List() []*PID {
func (m *pidMap) List() []*PID {
var out []*PID
m.Range(func(_, value interface{}) bool {
out = append(out, value.(*PID))
Expand All @@ -81,7 +81,7 @@ func (m *syncMap) List() []*PID {

// Reset resets the pids map
// nolint
func (m *syncMap) Reset() {
func (m *pidMap) Reset() {
// TODO: remove this line when migrated to go 1.23
//m.Clear()
m.Range(func(key interface{}, value interface{}) bool {
Expand Down
2 changes: 1 addition & 1 deletion actors/sync_map_test.go → actors/pid_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestPIDMap(t *testing.T) {
// create the PID
actorRef := &PID{address: actorPath, fieldsLocker: &sync.RWMutex{}, stopLocker: &sync.Mutex{}}
// create a new PID map
pidMap := newSyncMap()
pidMap := newMap()
// add to the map
pidMap.Set(actorRef)
// assert the length of the map
Expand Down
4 changes: 2 additions & 2 deletions actors/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,14 @@ func (x *scheduler) RemoteScheduleWithCron(ctx context.Context, message proto.Me
func (x *scheduler) distributeJobKeyOrNot(ctx context.Context, job *quartz.JobDetail) error {
jobKey := job.JobKey().String()
if x.cluster != nil {
ok, err := x.cluster.KeyExists(ctx, jobKey)
ok, err := x.cluster.SchedulerJobKeyExists(ctx, jobKey)
if err != nil {
return err
}
if ok {
return errSkipJobScheduling
}
if err := x.cluster.SetKey(ctx, jobKey); err != nil {
if err := x.cluster.SetSchedulerJobKey(ctx, jobKey); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit a0383fe

Please sign in to comment.