Skip to content

Commit

Permalink
fix: enhance shutdown process (#530)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Nov 24, 2024
1 parent d09571a commit c07cce7
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 71 deletions.
3 changes: 3 additions & 0 deletions actors/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func WithCluster(config *ClusterConfig) Option {
}

// WithShutdownTimeout sets the shutdown timeout
// The timeout needs to be considerable reasonable based upon the total number of actors
// the system will probably needs. The graceful timeout is shared amongst all actors and children
// actors created in the system to graceful shutdown via a cancellation context.
func WithShutdownTimeout(timeout time.Duration) Option {
return OptionFunc(
func(a *actorSystem) {
Expand Down
70 changes: 38 additions & 32 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ type PID struct {
// the default initialization timeout is 1s
initTimeout atomic.Duration

// shutdownTimeout specifies the graceful shutdown timeout
// the default value is 5 seconds
shutdownTimeout atomic.Duration

// specifies the actor mailbox
mailbox Mailbox

Expand Down Expand Up @@ -202,7 +198,6 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
}

pid.initMaxRetries.Store(DefaultInitMaxRetries)
pid.shutdownTimeout.Store(DefaultShutdownTimeout)
pid.latestReceiveDuration.Store(0)
pid.started.Store(false)
pid.stopping.Store(false)
Expand Down Expand Up @@ -446,7 +441,6 @@ func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts .
withActorSystem(pid.system),
withEventsStream(pid.eventsStream),
withInitTimeout(pid.initTimeout.Load()),
withShutdownTimeout(pid.shutdownTimeout.Load()),
withRemoting(pid.remoting),
withParent(pid),
}
Expand Down Expand Up @@ -1209,7 +1203,6 @@ func (pid *PID) init(ctx context.Context) error {
func (pid *PID) reset() {
pid.latestReceiveTime.Store(time.Time{})
pid.passivateAfter.Store(DefaultPassivationTimeout)
pid.shutdownTimeout.Store(DefaultShutdownTimeout)
pid.initMaxRetries.Store(DefaultInitMaxRetries)
pid.latestReceiveDuration.Store(0)
pid.initTimeout.Store(DefaultInitTimeout)
Expand All @@ -1223,7 +1216,8 @@ func (pid *PID) reset() {

// freeWatchers releases all the actors watching this actor
func (pid *PID) freeWatchers(ctx context.Context) error {
pid.logger.Debugf("%s freeing all watcher actors...", pid.ID())
logger := pid.logger
logger.Debugf("%s freeing all watcher actors...", pid.ID())
watchers := pid.watchers()
if watchers.Len() > 0 {
for _, watcher := range watchers.Items() {
Expand All @@ -1232,26 +1226,30 @@ func (pid *PID) freeWatchers(ctx context.Context) error {
}

if watcher.IsRunning() {
pid.logger.Debugf("watcher=(%s) releasing watched=(%s)", watcher.ID(), pid.ID())
logger.Debugf("watcher=(%s) releasing watched=(%s)", watcher.ID(), pid.ID())
if err := pid.Tell(ctx, watcher, terminated); err != nil {
return err
}

watcher.UnWatch(pid)
pid.logger.Debugf("watcher=(%s) released watched=(%s)", watcher.ID(), pid.ID())
logger.Debugf("watcher=(%s) released watched=(%s)", watcher.ID(), pid.ID())
}
}
logger.Debugf("%s successfully frees all watcher actors...", pid.ID())
return nil
}
pid.logger.Debugf("%s does not have any watcher actors", pid.ID())
logger.Debugf("%s does not have any watcher actors", pid.ID())
return nil
}

// freeWatchees releases all actors that have been watched by this actor
func (pid *PID) freeWatchees(ctx context.Context) error {
pid.logger.Debugf("%s freeing all watched actors...", pid.ID())
if pid.watcheesMap.Size() > 0 {
logger := pid.logger
logger.Debugf("%s freeing all watched actors...", pid.ID())
size := pid.watcheesMap.Size()
if size > 0 {
for _, watched := range pid.watcheesMap.List() {
pid.logger.Debugf("watcher=(%s) unwatching actor=(%s)", pid.ID(), watched.ID())
logger.Debugf("watcher=(%s) unwatching actor=(%s)", pid.ID(), watched.ID())
pid.UnWatch(watched)
if err := watched.Shutdown(ctx); err != nil {
errwrap := fmt.Errorf(
Expand All @@ -1260,29 +1258,38 @@ func (pid *PID) freeWatchees(ctx context.Context) error {
)
return errwrap
}
pid.logger.Debugf("watcher=(%s) successfully unwatch actor=(%s)", pid.ID(), watched.ID())
logger.Debugf("watcher=(%s) successfully unwatch actor=(%s)", pid.ID(), watched.ID())
}
logger.Debugf("%s successfully unwatch all watched actors...", pid.ID())
return nil
}
pid.logger.Debugf("%s does not have any watched actors", pid.ID())
logger.Debugf("%s does not have any watched actors", pid.ID())
return nil
}

// freeChildren releases all child actors
func (pid *PID) freeChildren(ctx context.Context) error {
pid.logger.Debug("%s freeing all child actors...", pid.ID())
for _, child := range pid.Children() {
pid.logger.Debugf("parent=(%s) disowning child=(%s)", pid.ID(), child.ID())
pid.UnWatch(child)
pid.childrenMap.Remove(child.Address())
if err := child.Shutdown(ctx); err != nil {
errwrap := fmt.Errorf(
"parent=(%s) failed to disown child=(%s): %w", pid.ID(), child.ID(),
err,
)
return errwrap
logger := pid.logger
logger.Debugf("%s freeing all child actors...", pid.ID())
size := pid.childrenMap.Size()
if size > 0 {
for _, child := range pid.Children() {
logger.Debugf("parent=(%s) disowning child=(%s)", pid.ID(), child.ID())
pid.UnWatch(child)
pid.childrenMap.Remove(child.Address())
if err := child.Shutdown(ctx); err != nil {
errwrap := fmt.Errorf(
"parent=(%s) failed to disown child=(%s): %w", pid.ID(), child.ID(),
err,
)
return errwrap
}
logger.Debugf("parent=(%s) successfully disown child=(%s)", pid.ID(), child.ID())
}
pid.logger.Debugf("parent=(%s) successfully disown child=(%s)", pid.ID(), child.ID())
logger.Debugf("%s successfully free all child actors...", pid.ID())
return nil
}
pid.logger.Debugf("%s does not have any children", pid.ID())
return nil
}

Expand Down Expand Up @@ -1382,11 +1389,10 @@ func (pid *PID) doStop(ctx context.Context) error {

// wait for all messages in the mailbox to be processed
// init a ticker that run every 10 ms to make sure we process all messages in the
// mailbox.
// mailbox within a second
// TODO: revisit this timeout or discard all remaining messages in the mailbox
ticker := time.NewTicker(10 * time.Millisecond)
timer := time.After(pid.shutdownTimeout.Load())
tickerStopSig := make(chan types.Unit)
// start ticking
go func() {
for {
select {
Expand All @@ -1395,7 +1401,7 @@ func (pid *PID) doStop(ctx context.Context) error {
close(tickerStopSig)
return
}
case <-timer:
case <-time.After(2 * time.Second):
close(tickerStopSig)
return
}
Expand Down
7 changes: 0 additions & 7 deletions actors/pid_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ func withSupervisorDirective(directive SupervisorDirective) pidOption {
}
}

// withShutdownTimeout sets the shutdown timeout
func withShutdownTimeout(duration time.Duration) pidOption {
return func(pid *PID) {
pid.shutdownTimeout.Store(duration)
}
}

// withNoPassivation disable passivation
func withPassivationDisabled() pidOption {
return func(pid *PID) {
Expand Down
5 changes: 0 additions & 5 deletions actors/pid_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ func TestPIDOptions(t *testing.T) {
option: withSupervisorDirective(resumeDirective),
expected: &PID{supervisorDirective: resumeDirective},
},
{
name: "WithShutdownTimeout",
option: withShutdownTimeout(time.Second),
expected: &PID{shutdownTimeout: atomicDuration},
},
{
name: "WithPassivationDisabled",
option: withPassivationDisabled(),
Expand Down
76 changes: 52 additions & 24 deletions actors/pid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,10 @@ func TestSupervisorStrategy(t *testing.T) {
assert.NotNil(t, parent)

// create the child actor
child, err := parent.SpawnChild(ctx, "SpawnChild", newTestSupervised())
child, err := parent.SpawnChild(ctx, "SpawnChild", newTestSupervised(), WithSupervisor(NewStopDirective()))
assert.NoError(t, err)
assert.NotNil(t, child)
assert.Equal(t, parent.ID(), child.Parent().ID())

assert.Len(t, parent.Children(), 1)
// let us send 10 public to the actors
Expand Down Expand Up @@ -787,6 +788,36 @@ func TestMessaging(t *testing.T) {
lib.Pause(time.Second)
assert.False(t, pid2.IsRunning())
})
t.Run("With Ask invalid timeout", func(t *testing.T) {
ctx := context.TODO()
// create a Ping actor
opts := []pidOption{
withInitMaxRetries(1),
withCustomLogger(log.DefaultLogger),
}
ports := dynaport.Get(1)
// create the actor path
actor1 := &exchanger{}
actorPath1 := address.New("Exchange1", "sys", "host", ports[0])
pid1, err := newPID(ctx, actorPath1, actor1, opts...)

require.NoError(t, err)
require.NotNil(t, pid1)

actor2 := &exchanger{}
actorPath2 := address.New("Exchange2", "sys", "host", ports[0])
pid2, err := newPID(ctx, actorPath2, actor2, opts...)
require.NoError(t, err)
require.NotNil(t, pid2)

// send an ask
_, err = pid1.Ask(ctx, pid2, new(testpb.TestReply), 0)
require.Error(t, err)
require.EqualError(t, err, ErrInvalidTimeout.Error())

require.NoError(t, pid1.Shutdown(ctx))
require.NoError(t, pid2.Shutdown(ctx))
})
t.Run("With Ask when not ready", func(t *testing.T) {
ctx := context.TODO()
// create a Ping actor
Expand Down Expand Up @@ -2344,25 +2375,23 @@ func TestPipeTo(t *testing.T) {
t.Run("With is a dead actor: case 2", func(t *testing.T) {
askTimeout := time.Minute
ctx := context.TODO()

opts := []pidOption{
withInitMaxRetries(1),
withPassivationDisabled(),
withCustomLogger(log.DiscardLogger),
}
ports := dynaport.Get(1)
actorSystem, err := NewActorSystem("sys",
WithActorInitMaxRetries(1),
WithLogger(log.DiscardLogger),
WithRemoting("127.0.0.1", int32(ports[0])),
WithPassivationDisabled())

require.NoError(t, actorSystem.Start(ctx))
lib.Pause(time.Second)

// create actor1
actor1 := &exchanger{}
actorPath1 := address.New("Exchange1", "sys", "host", ports[0])
pid1, err := newPID(ctx, actorPath1, actor1, opts...)
pid1, err := actorSystem.Spawn(ctx, "Exchange1", &exchanger{})
require.NoError(t, err)
require.NotNil(t, pid1)

// create actor2
actor2 := &exchanger{}
actorPath2 := address.New("Exchange2", "sys", "host", ports[0])
pid2, err := newPID(ctx, actorPath2, actor2, opts...)
pid2, err := actorSystem.Spawn(ctx, "Exchange2", &exchanger{})
require.NoError(t, err)
require.NotNil(t, pid2)

Expand All @@ -2379,28 +2408,27 @@ func TestPipeTo(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
// Wait for some time and during that period send some messages to the actor
// send three messages while waiting for the future to completed
_, _ = Ask(ctx, pid1, new(testpb.TestReply), askTimeout)
_, _ = Ask(ctx, pid1, new(testpb.TestReply), askTimeout)
_, _ = Ask(ctx, pid1, new(testpb.TestReply), askTimeout)

task <- new(testspb.TaskComplete)

lib.Pause(time.Second)

close(task)
wg.Done()
}()

// now we complete the Task
task <- new(testspb.TaskComplete)
_ = Tell(ctx, pid2, new(testpb.TestBye))

wg.Wait()

lib.Pause(2 * time.Second)

require.EqualValues(t, 3, pid1.ProcessedCount()-1)
require.NotZero(t, pid2.ProcessedCount())

_ = Tell(ctx, pid2, new(testpb.TestBye))
lib.Pause(2 * time.Second)
require.Zero(t, pid2.ProcessedCount())

lib.Pause(time.Second)
assert.NoError(t, pid1.Shutdown(ctx))
assert.NoError(t, actorSystem.Stop(ctx))
})
t.Run("With undefined task", func(t *testing.T) {
ctx := context.TODO()
Expand Down
4 changes: 2 additions & 2 deletions actors/spawn_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type SpawnOption interface {

var _ SpawnOption = spawnOption(nil)

// funcOption implements the FuncOption interface.
// spawnOption implements the SpawnOption interface.
type spawnOption func(config *spawnConfig)

// Apply implementation
// Apply sets the Option value of a config.
func (f spawnOption) Apply(c *spawnConfig) {
f(c)
}
Expand Down
2 changes: 1 addition & 1 deletion actors/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
// DefaultInitMaxRetries defines the default value for retrying actor initialization
DefaultInitMaxRetries = 5
// DefaultShutdownTimeout defines the default shutdown timeout
DefaultShutdownTimeout = 2 * time.Second
DefaultShutdownTimeout = time.Minute
// DefaultInitTimeout defines the default init timeout
DefaultInitTimeout = time.Second
// DefaultPeerStateLoopInterval defines the default peer state loop interval
Expand Down

0 comments on commit c07cce7

Please sign in to comment.