From c07cce718530133c316d4416ab3ba03736de95c3 Mon Sep 17 00:00:00 2001 From: Arsene Date: Sun, 24 Nov 2024 23:03:54 +0000 Subject: [PATCH] fix: enhance shutdown process (#530) --- actors/option.go | 3 ++ actors/pid.go | 70 +++++++++++++++++++----------------- actors/pid_option.go | 7 ---- actors/pid_option_test.go | 5 --- actors/pid_test.go | 76 ++++++++++++++++++++++++++------------- actors/spawn_option.go | 4 +-- actors/types.go | 2 +- 7 files changed, 96 insertions(+), 71 deletions(-) diff --git a/actors/option.go b/actors/option.go index 2f59b79a..9811ea40 100644 --- a/actors/option.go +++ b/actors/option.go @@ -140,6 +140,9 @@ func WithCluster(config *ClusterConfig) Option { } // WithShutdownTimeout sets the shutdown timeout +// The timeout needs to be considerable reasonable based upon the total number of actors +// the system will probably needs. The graceful timeout is shared amongst all actors and children +// actors created in the system to graceful shutdown via a cancellation context. func WithShutdownTimeout(timeout time.Duration) Option { return OptionFunc( func(a *actorSystem) { diff --git a/actors/pid.go b/actors/pid.go index ee9d532e..fbc12821 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -99,10 +99,6 @@ type PID struct { // the default initialization timeout is 1s initTimeout atomic.Duration - // shutdownTimeout specifies the graceful shutdown timeout - // the default value is 5 seconds - shutdownTimeout atomic.Duration - // specifies the actor mailbox mailbox Mailbox @@ -202,7 +198,6 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ... } pid.initMaxRetries.Store(DefaultInitMaxRetries) - pid.shutdownTimeout.Store(DefaultShutdownTimeout) pid.latestReceiveDuration.Store(0) pid.started.Store(false) pid.stopping.Store(false) @@ -446,7 +441,6 @@ func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts . withActorSystem(pid.system), withEventsStream(pid.eventsStream), withInitTimeout(pid.initTimeout.Load()), - withShutdownTimeout(pid.shutdownTimeout.Load()), withRemoting(pid.remoting), withParent(pid), } @@ -1209,7 +1203,6 @@ func (pid *PID) init(ctx context.Context) error { func (pid *PID) reset() { pid.latestReceiveTime.Store(time.Time{}) pid.passivateAfter.Store(DefaultPassivationTimeout) - pid.shutdownTimeout.Store(DefaultShutdownTimeout) pid.initMaxRetries.Store(DefaultInitMaxRetries) pid.latestReceiveDuration.Store(0) pid.initTimeout.Store(DefaultInitTimeout) @@ -1223,7 +1216,8 @@ func (pid *PID) reset() { // freeWatchers releases all the actors watching this actor func (pid *PID) freeWatchers(ctx context.Context) error { - pid.logger.Debugf("%s freeing all watcher actors...", pid.ID()) + logger := pid.logger + logger.Debugf("%s freeing all watcher actors...", pid.ID()) watchers := pid.watchers() if watchers.Len() > 0 { for _, watcher := range watchers.Items() { @@ -1232,26 +1226,30 @@ func (pid *PID) freeWatchers(ctx context.Context) error { } if watcher.IsRunning() { - pid.logger.Debugf("watcher=(%s) releasing watched=(%s)", watcher.ID(), pid.ID()) + logger.Debugf("watcher=(%s) releasing watched=(%s)", watcher.ID(), pid.ID()) if err := pid.Tell(ctx, watcher, terminated); err != nil { return err } watcher.UnWatch(pid) - pid.logger.Debugf("watcher=(%s) released watched=(%s)", watcher.ID(), pid.ID()) + logger.Debugf("watcher=(%s) released watched=(%s)", watcher.ID(), pid.ID()) } } + logger.Debugf("%s successfully frees all watcher actors...", pid.ID()) + return nil } - pid.logger.Debugf("%s does not have any watcher actors", pid.ID()) + logger.Debugf("%s does not have any watcher actors", pid.ID()) return nil } // freeWatchees releases all actors that have been watched by this actor func (pid *PID) freeWatchees(ctx context.Context) error { - pid.logger.Debugf("%s freeing all watched actors...", pid.ID()) - if pid.watcheesMap.Size() > 0 { + logger := pid.logger + logger.Debugf("%s freeing all watched actors...", pid.ID()) + size := pid.watcheesMap.Size() + if size > 0 { for _, watched := range pid.watcheesMap.List() { - pid.logger.Debugf("watcher=(%s) unwatching actor=(%s)", pid.ID(), watched.ID()) + logger.Debugf("watcher=(%s) unwatching actor=(%s)", pid.ID(), watched.ID()) pid.UnWatch(watched) if err := watched.Shutdown(ctx); err != nil { errwrap := fmt.Errorf( @@ -1260,29 +1258,38 @@ func (pid *PID) freeWatchees(ctx context.Context) error { ) return errwrap } - pid.logger.Debugf("watcher=(%s) successfully unwatch actor=(%s)", pid.ID(), watched.ID()) + logger.Debugf("watcher=(%s) successfully unwatch actor=(%s)", pid.ID(), watched.ID()) } + logger.Debugf("%s successfully unwatch all watched actors...", pid.ID()) + return nil } - pid.logger.Debugf("%s does not have any watched actors", pid.ID()) + logger.Debugf("%s does not have any watched actors", pid.ID()) return nil } // freeChildren releases all child actors func (pid *PID) freeChildren(ctx context.Context) error { - pid.logger.Debug("%s freeing all child actors...", pid.ID()) - for _, child := range pid.Children() { - pid.logger.Debugf("parent=(%s) disowning child=(%s)", pid.ID(), child.ID()) - pid.UnWatch(child) - pid.childrenMap.Remove(child.Address()) - if err := child.Shutdown(ctx); err != nil { - errwrap := fmt.Errorf( - "parent=(%s) failed to disown child=(%s): %w", pid.ID(), child.ID(), - err, - ) - return errwrap + logger := pid.logger + logger.Debugf("%s freeing all child actors...", pid.ID()) + size := pid.childrenMap.Size() + if size > 0 { + for _, child := range pid.Children() { + logger.Debugf("parent=(%s) disowning child=(%s)", pid.ID(), child.ID()) + pid.UnWatch(child) + pid.childrenMap.Remove(child.Address()) + if err := child.Shutdown(ctx); err != nil { + errwrap := fmt.Errorf( + "parent=(%s) failed to disown child=(%s): %w", pid.ID(), child.ID(), + err, + ) + return errwrap + } + logger.Debugf("parent=(%s) successfully disown child=(%s)", pid.ID(), child.ID()) } - pid.logger.Debugf("parent=(%s) successfully disown child=(%s)", pid.ID(), child.ID()) + logger.Debugf("%s successfully free all child actors...", pid.ID()) + return nil } + pid.logger.Debugf("%s does not have any children", pid.ID()) return nil } @@ -1382,11 +1389,10 @@ func (pid *PID) doStop(ctx context.Context) error { // wait for all messages in the mailbox to be processed // init a ticker that run every 10 ms to make sure we process all messages in the - // mailbox. + // mailbox within a second + // TODO: revisit this timeout or discard all remaining messages in the mailbox ticker := time.NewTicker(10 * time.Millisecond) - timer := time.After(pid.shutdownTimeout.Load()) tickerStopSig := make(chan types.Unit) - // start ticking go func() { for { select { @@ -1395,7 +1401,7 @@ func (pid *PID) doStop(ctx context.Context) error { close(tickerStopSig) return } - case <-timer: + case <-time.After(2 * time.Second): close(tickerStopSig) return } diff --git a/actors/pid_option.go b/actors/pid_option.go index e793720a..792c1e96 100644 --- a/actors/pid_option.go +++ b/actors/pid_option.go @@ -70,13 +70,6 @@ func withSupervisorDirective(directive SupervisorDirective) pidOption { } } -// withShutdownTimeout sets the shutdown timeout -func withShutdownTimeout(duration time.Duration) pidOption { - return func(pid *PID) { - pid.shutdownTimeout.Store(duration) - } -} - // withNoPassivation disable passivation func withPassivationDisabled() pidOption { return func(pid *PID) { diff --git a/actors/pid_option_test.go b/actors/pid_option_test.go index 02f6d2b4..ab951ba6 100644 --- a/actors/pid_option_test.go +++ b/actors/pid_option_test.go @@ -77,11 +77,6 @@ func TestPIDOptions(t *testing.T) { option: withSupervisorDirective(resumeDirective), expected: &PID{supervisorDirective: resumeDirective}, }, - { - name: "WithShutdownTimeout", - option: withShutdownTimeout(time.Second), - expected: &PID{shutdownTimeout: atomicDuration}, - }, { name: "WithPassivationDisabled", option: withPassivationDisabled(), diff --git a/actors/pid_test.go b/actors/pid_test.go index 3478b582..721b451c 100644 --- a/actors/pid_test.go +++ b/actors/pid_test.go @@ -413,9 +413,10 @@ func TestSupervisorStrategy(t *testing.T) { assert.NotNil(t, parent) // create the child actor - child, err := parent.SpawnChild(ctx, "SpawnChild", newTestSupervised()) + child, err := parent.SpawnChild(ctx, "SpawnChild", newTestSupervised(), WithSupervisor(NewStopDirective())) assert.NoError(t, err) assert.NotNil(t, child) + assert.Equal(t, parent.ID(), child.Parent().ID()) assert.Len(t, parent.Children(), 1) // let us send 10 public to the actors @@ -787,6 +788,36 @@ func TestMessaging(t *testing.T) { lib.Pause(time.Second) assert.False(t, pid2.IsRunning()) }) + t.Run("With Ask invalid timeout", func(t *testing.T) { + ctx := context.TODO() + // create a Ping actor + opts := []pidOption{ + withInitMaxRetries(1), + withCustomLogger(log.DefaultLogger), + } + ports := dynaport.Get(1) + // create the actor path + actor1 := &exchanger{} + actorPath1 := address.New("Exchange1", "sys", "host", ports[0]) + pid1, err := newPID(ctx, actorPath1, actor1, opts...) + + require.NoError(t, err) + require.NotNil(t, pid1) + + actor2 := &exchanger{} + actorPath2 := address.New("Exchange2", "sys", "host", ports[0]) + pid2, err := newPID(ctx, actorPath2, actor2, opts...) + require.NoError(t, err) + require.NotNil(t, pid2) + + // send an ask + _, err = pid1.Ask(ctx, pid2, new(testpb.TestReply), 0) + require.Error(t, err) + require.EqualError(t, err, ErrInvalidTimeout.Error()) + + require.NoError(t, pid1.Shutdown(ctx)) + require.NoError(t, pid2.Shutdown(ctx)) + }) t.Run("With Ask when not ready", func(t *testing.T) { ctx := context.TODO() // create a Ping actor @@ -2344,25 +2375,23 @@ func TestPipeTo(t *testing.T) { t.Run("With is a dead actor: case 2", func(t *testing.T) { askTimeout := time.Minute ctx := context.TODO() - - opts := []pidOption{ - withInitMaxRetries(1), - withPassivationDisabled(), - withCustomLogger(log.DiscardLogger), - } ports := dynaport.Get(1) + actorSystem, err := NewActorSystem("sys", + WithActorInitMaxRetries(1), + WithLogger(log.DiscardLogger), + WithRemoting("127.0.0.1", int32(ports[0])), + WithPassivationDisabled()) + + require.NoError(t, actorSystem.Start(ctx)) + lib.Pause(time.Second) // create actor1 - actor1 := &exchanger{} - actorPath1 := address.New("Exchange1", "sys", "host", ports[0]) - pid1, err := newPID(ctx, actorPath1, actor1, opts...) + pid1, err := actorSystem.Spawn(ctx, "Exchange1", &exchanger{}) require.NoError(t, err) require.NotNil(t, pid1) // create actor2 - actor2 := &exchanger{} - actorPath2 := address.New("Exchange2", "sys", "host", ports[0]) - pid2, err := newPID(ctx, actorPath2, actor2, opts...) + pid2, err := actorSystem.Spawn(ctx, "Exchange2", &exchanger{}) require.NoError(t, err) require.NotNil(t, pid2) @@ -2379,28 +2408,27 @@ func TestPipeTo(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - // Wait for some time and during that period send some messages to the actor - // send three messages while waiting for the future to completed _, _ = Ask(ctx, pid1, new(testpb.TestReply), askTimeout) _, _ = Ask(ctx, pid1, new(testpb.TestReply), askTimeout) _, _ = Ask(ctx, pid1, new(testpb.TestReply), askTimeout) + + task <- new(testspb.TaskComplete) + lib.Pause(time.Second) + + close(task) wg.Done() }() - - // now we complete the Task - task <- new(testspb.TaskComplete) - _ = Tell(ctx, pid2, new(testpb.TestBye)) - wg.Wait() - lib.Pause(2 * time.Second) - require.EqualValues(t, 3, pid1.ProcessedCount()-1) + require.NotZero(t, pid2.ProcessedCount()) + + _ = Tell(ctx, pid2, new(testpb.TestBye)) + lib.Pause(2 * time.Second) require.Zero(t, pid2.ProcessedCount()) - lib.Pause(time.Second) - assert.NoError(t, pid1.Shutdown(ctx)) + assert.NoError(t, actorSystem.Stop(ctx)) }) t.Run("With undefined task", func(t *testing.T) { ctx := context.TODO() diff --git a/actors/spawn_option.go b/actors/spawn_option.go index a7fb6272..7ec5b994 100644 --- a/actors/spawn_option.go +++ b/actors/spawn_option.go @@ -50,10 +50,10 @@ type SpawnOption interface { var _ SpawnOption = spawnOption(nil) -// funcOption implements the FuncOption interface. +// spawnOption implements the SpawnOption interface. type spawnOption func(config *spawnConfig) -// Apply implementation +// Apply sets the Option value of a config. func (f spawnOption) Apply(c *spawnConfig) { f(c) } diff --git a/actors/types.go b/actors/types.go index b95054a5..144ef3c0 100644 --- a/actors/types.go +++ b/actors/types.go @@ -34,7 +34,7 @@ const ( // DefaultInitMaxRetries defines the default value for retrying actor initialization DefaultInitMaxRetries = 5 // DefaultShutdownTimeout defines the default shutdown timeout - DefaultShutdownTimeout = 2 * time.Second + DefaultShutdownTimeout = time.Minute // DefaultInitTimeout defines the default init timeout DefaultInitTimeout = time.Second // DefaultPeerStateLoopInterval defines the default peer state loop interval