Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enhance shutdown process #530

Merged
merged 5 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions actors/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func WithCluster(config *ClusterConfig) Option {
}

// WithShutdownTimeout sets the shutdown timeout
// The timeout needs to be considerable reasonable based upon the total number of actors
// the system will probably needs. The graceful timeout is shared amongst all actors and children
// actors created in the system to graceful shutdown via a cancellation context.
func WithShutdownTimeout(timeout time.Duration) Option {
return OptionFunc(
func(a *actorSystem) {
Expand Down
70 changes: 38 additions & 32 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ type PID struct {
// the default initialization timeout is 1s
initTimeout atomic.Duration

// shutdownTimeout specifies the graceful shutdown timeout
// the default value is 5 seconds
shutdownTimeout atomic.Duration

// specifies the actor mailbox
mailbox Mailbox

Expand Down Expand Up @@ -202,7 +198,6 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
}

pid.initMaxRetries.Store(DefaultInitMaxRetries)
pid.shutdownTimeout.Store(DefaultShutdownTimeout)
pid.latestReceiveDuration.Store(0)
pid.started.Store(false)
pid.stopping.Store(false)
Expand Down Expand Up @@ -446,7 +441,6 @@ func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts .
withActorSystem(pid.system),
withEventsStream(pid.eventsStream),
withInitTimeout(pid.initTimeout.Load()),
withShutdownTimeout(pid.shutdownTimeout.Load()),
withRemoting(pid.remoting),
withParent(pid),
}
Expand Down Expand Up @@ -1209,7 +1203,6 @@ func (pid *PID) init(ctx context.Context) error {
func (pid *PID) reset() {
pid.latestReceiveTime.Store(time.Time{})
pid.passivateAfter.Store(DefaultPassivationTimeout)
pid.shutdownTimeout.Store(DefaultShutdownTimeout)
pid.initMaxRetries.Store(DefaultInitMaxRetries)
pid.latestReceiveDuration.Store(0)
pid.initTimeout.Store(DefaultInitTimeout)
Expand All @@ -1223,7 +1216,8 @@ func (pid *PID) reset() {

// freeWatchers releases all the actors watching this actor
func (pid *PID) freeWatchers(ctx context.Context) error {
pid.logger.Debugf("%s freeing all watcher actors...", pid.ID())
logger := pid.logger
logger.Debugf("%s freeing all watcher actors...", pid.ID())
watchers := pid.watchers()
if watchers.Len() > 0 {
for _, watcher := range watchers.Items() {
Expand All @@ -1232,26 +1226,30 @@ func (pid *PID) freeWatchers(ctx context.Context) error {
}

if watcher.IsRunning() {
pid.logger.Debugf("watcher=(%s) releasing watched=(%s)", watcher.ID(), pid.ID())
logger.Debugf("watcher=(%s) releasing watched=(%s)", watcher.ID(), pid.ID())
if err := pid.Tell(ctx, watcher, terminated); err != nil {
return err
}

watcher.UnWatch(pid)
pid.logger.Debugf("watcher=(%s) released watched=(%s)", watcher.ID(), pid.ID())
logger.Debugf("watcher=(%s) released watched=(%s)", watcher.ID(), pid.ID())
}
}
logger.Debugf("%s successfully frees all watcher actors...", pid.ID())
return nil
}
pid.logger.Debugf("%s does not have any watcher actors", pid.ID())
logger.Debugf("%s does not have any watcher actors", pid.ID())
return nil
}

// freeWatchees releases all actors that have been watched by this actor
func (pid *PID) freeWatchees(ctx context.Context) error {
pid.logger.Debugf("%s freeing all watched actors...", pid.ID())
if pid.watcheesMap.Size() > 0 {
logger := pid.logger
logger.Debugf("%s freeing all watched actors...", pid.ID())
size := pid.watcheesMap.Size()
if size > 0 {
for _, watched := range pid.watcheesMap.List() {
pid.logger.Debugf("watcher=(%s) unwatching actor=(%s)", pid.ID(), watched.ID())
logger.Debugf("watcher=(%s) unwatching actor=(%s)", pid.ID(), watched.ID())
pid.UnWatch(watched)
if err := watched.Shutdown(ctx); err != nil {
errwrap := fmt.Errorf(
Expand All @@ -1260,29 +1258,38 @@ func (pid *PID) freeWatchees(ctx context.Context) error {
)
return errwrap
}
pid.logger.Debugf("watcher=(%s) successfully unwatch actor=(%s)", pid.ID(), watched.ID())
logger.Debugf("watcher=(%s) successfully unwatch actor=(%s)", pid.ID(), watched.ID())
}
logger.Debugf("%s successfully unwatch all watched actors...", pid.ID())
return nil
}
pid.logger.Debugf("%s does not have any watched actors", pid.ID())
logger.Debugf("%s does not have any watched actors", pid.ID())
return nil
}

// freeChildren releases all child actors
func (pid *PID) freeChildren(ctx context.Context) error {
pid.logger.Debug("%s freeing all child actors...", pid.ID())
for _, child := range pid.Children() {
pid.logger.Debugf("parent=(%s) disowning child=(%s)", pid.ID(), child.ID())
pid.UnWatch(child)
pid.childrenMap.Remove(child.Address())
if err := child.Shutdown(ctx); err != nil {
errwrap := fmt.Errorf(
"parent=(%s) failed to disown child=(%s): %w", pid.ID(), child.ID(),
err,
)
return errwrap
logger := pid.logger
logger.Debugf("%s freeing all child actors...", pid.ID())
size := pid.childrenMap.Size()
if size > 0 {
for _, child := range pid.Children() {
logger.Debugf("parent=(%s) disowning child=(%s)", pid.ID(), child.ID())
pid.UnWatch(child)
pid.childrenMap.Remove(child.Address())
if err := child.Shutdown(ctx); err != nil {
errwrap := fmt.Errorf(
"parent=(%s) failed to disown child=(%s): %w", pid.ID(), child.ID(),
err,
)
return errwrap
}
logger.Debugf("parent=(%s) successfully disown child=(%s)", pid.ID(), child.ID())
}
pid.logger.Debugf("parent=(%s) successfully disown child=(%s)", pid.ID(), child.ID())
logger.Debugf("%s successfully free all child actors...", pid.ID())
return nil
}
pid.logger.Debugf("%s does not have any children", pid.ID())
return nil
}

Expand Down Expand Up @@ -1382,11 +1389,10 @@ func (pid *PID) doStop(ctx context.Context) error {

// wait for all messages in the mailbox to be processed
// init a ticker that run every 10 ms to make sure we process all messages in the
// mailbox.
// mailbox within a second
// TODO: revisit this timeout or discard all remaining messages in the mailbox
ticker := time.NewTicker(10 * time.Millisecond)
timer := time.After(pid.shutdownTimeout.Load())
tickerStopSig := make(chan types.Unit)
// start ticking
go func() {
for {
select {
Expand All @@ -1395,7 +1401,7 @@ func (pid *PID) doStop(ctx context.Context) error {
close(tickerStopSig)
return
}
case <-timer:
case <-time.After(2 * time.Second):
close(tickerStopSig)
return
}
Expand Down
7 changes: 0 additions & 7 deletions actors/pid_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ func withSupervisorDirective(directive SupervisorDirective) pidOption {
}
}

// withShutdownTimeout sets the shutdown timeout
func withShutdownTimeout(duration time.Duration) pidOption {
return func(pid *PID) {
pid.shutdownTimeout.Store(duration)
}
}

// withNoPassivation disable passivation
func withPassivationDisabled() pidOption {
return func(pid *PID) {
Expand Down
5 changes: 0 additions & 5 deletions actors/pid_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ func TestPIDOptions(t *testing.T) {
option: withSupervisorDirective(resumeDirective),
expected: &PID{supervisorDirective: resumeDirective},
},
{
name: "WithShutdownTimeout",
option: withShutdownTimeout(time.Second),
expected: &PID{shutdownTimeout: atomicDuration},
},
{
name: "WithPassivationDisabled",
option: withPassivationDisabled(),
Expand Down
76 changes: 52 additions & 24 deletions actors/pid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,10 @@ func TestSupervisorStrategy(t *testing.T) {
assert.NotNil(t, parent)

// create the child actor
child, err := parent.SpawnChild(ctx, "SpawnChild", newTestSupervised())
child, err := parent.SpawnChild(ctx, "SpawnChild", newTestSupervised(), WithSupervisor(NewStopDirective()))
assert.NoError(t, err)
assert.NotNil(t, child)
assert.Equal(t, parent.ID(), child.Parent().ID())

assert.Len(t, parent.Children(), 1)
// let us send 10 public to the actors
Expand Down Expand Up @@ -787,6 +788,36 @@ func TestMessaging(t *testing.T) {
lib.Pause(time.Second)
assert.False(t, pid2.IsRunning())
})
t.Run("With Ask invalid timeout", func(t *testing.T) {
ctx := context.TODO()
// create a Ping actor
opts := []pidOption{
withInitMaxRetries(1),
withCustomLogger(log.DefaultLogger),
}
ports := dynaport.Get(1)
// create the actor path
actor1 := &exchanger{}
actorPath1 := address.New("Exchange1", "sys", "host", ports[0])
pid1, err := newPID(ctx, actorPath1, actor1, opts...)

require.NoError(t, err)
require.NotNil(t, pid1)

actor2 := &exchanger{}
actorPath2 := address.New("Exchange2", "sys", "host", ports[0])
pid2, err := newPID(ctx, actorPath2, actor2, opts...)
require.NoError(t, err)
require.NotNil(t, pid2)

// send an ask
_, err = pid1.Ask(ctx, pid2, new(testpb.TestReply), 0)
require.Error(t, err)
require.EqualError(t, err, ErrInvalidTimeout.Error())

require.NoError(t, pid1.Shutdown(ctx))
require.NoError(t, pid2.Shutdown(ctx))
})
t.Run("With Ask when not ready", func(t *testing.T) {
ctx := context.TODO()
// create a Ping actor
Expand Down Expand Up @@ -2344,25 +2375,23 @@ func TestPipeTo(t *testing.T) {
t.Run("With is a dead actor: case 2", func(t *testing.T) {
askTimeout := time.Minute
ctx := context.TODO()

opts := []pidOption{
withInitMaxRetries(1),
withPassivationDisabled(),
withCustomLogger(log.DiscardLogger),
}
ports := dynaport.Get(1)
actorSystem, err := NewActorSystem("sys",
WithActorInitMaxRetries(1),
WithLogger(log.DiscardLogger),
WithRemoting("127.0.0.1", int32(ports[0])),
WithPassivationDisabled())

require.NoError(t, actorSystem.Start(ctx))
lib.Pause(time.Second)

// create actor1
actor1 := &exchanger{}
actorPath1 := address.New("Exchange1", "sys", "host", ports[0])
pid1, err := newPID(ctx, actorPath1, actor1, opts...)
pid1, err := actorSystem.Spawn(ctx, "Exchange1", &exchanger{})
require.NoError(t, err)
require.NotNil(t, pid1)

// create actor2
actor2 := &exchanger{}
actorPath2 := address.New("Exchange2", "sys", "host", ports[0])
pid2, err := newPID(ctx, actorPath2, actor2, opts...)
pid2, err := actorSystem.Spawn(ctx, "Exchange2", &exchanger{})
require.NoError(t, err)
require.NotNil(t, pid2)

Expand All @@ -2379,28 +2408,27 @@ func TestPipeTo(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
// Wait for some time and during that period send some messages to the actor
// send three messages while waiting for the future to completed
_, _ = Ask(ctx, pid1, new(testpb.TestReply), askTimeout)
_, _ = Ask(ctx, pid1, new(testpb.TestReply), askTimeout)
_, _ = Ask(ctx, pid1, new(testpb.TestReply), askTimeout)

task <- new(testspb.TaskComplete)

lib.Pause(time.Second)

close(task)
wg.Done()
}()

// now we complete the Task
task <- new(testspb.TaskComplete)
_ = Tell(ctx, pid2, new(testpb.TestBye))

wg.Wait()

lib.Pause(2 * time.Second)

require.EqualValues(t, 3, pid1.ProcessedCount()-1)
require.NotZero(t, pid2.ProcessedCount())

_ = Tell(ctx, pid2, new(testpb.TestBye))
lib.Pause(2 * time.Second)
require.Zero(t, pid2.ProcessedCount())

lib.Pause(time.Second)
assert.NoError(t, pid1.Shutdown(ctx))
assert.NoError(t, actorSystem.Stop(ctx))
})
t.Run("With undefined task", func(t *testing.T) {
ctx := context.TODO()
Expand Down
4 changes: 2 additions & 2 deletions actors/spawn_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type SpawnOption interface {

var _ SpawnOption = spawnOption(nil)

// funcOption implements the FuncOption interface.
// spawnOption implements the SpawnOption interface.
type spawnOption func(config *spawnConfig)

// Apply implementation
// Apply sets the Option value of a config.
func (f spawnOption) Apply(c *spawnConfig) {
f(c)
}
Expand Down
2 changes: 1 addition & 1 deletion actors/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
// DefaultInitMaxRetries defines the default value for retrying actor initialization
DefaultInitMaxRetries = 5
// DefaultShutdownTimeout defines the default shutdown timeout
DefaultShutdownTimeout = 2 * time.Second
DefaultShutdownTimeout = time.Minute
// DefaultInitTimeout defines the default init timeout
DefaultInitTimeout = time.Second
// DefaultPeerStateLoopInterval defines the default peer state loop interval
Expand Down