From 2e6ffda81358ea32e865a618a472d46cf685fb8b Mon Sep 17 00:00:00 2001 From: Tochemey Date: Tue, 26 Nov 2024 19:01:58 +0000 Subject: [PATCH] perf: speed up actor shutdown --- actors/pid.go | 94 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 33 deletions(-) diff --git a/actors/pid.go b/actors/pid.go index fbc12821..57f15ddb 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -36,6 +36,7 @@ import ( "connectrpc.com/connect" "github.com/flowchartsman/retry" "go.uber.org/atomic" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/durationpb" @@ -79,7 +80,7 @@ type PID struct { // specifies the actor address address *address.Address - // helps determine whether the actor should handle messags or not. + // helps determine whether the actor should handle messages or not. started atomic.Bool stopping atomic.Bool @@ -1220,20 +1221,29 @@ func (pid *PID) freeWatchers(ctx context.Context) error { logger.Debugf("%s freeing all watcher actors...", pid.ID()) watchers := pid.watchers() if watchers.Len() > 0 { + eg, ctx := errgroup.WithContext(ctx) for _, watcher := range watchers.Items() { - terminated := &goaktpb.Terminated{ - ActorId: pid.ID(), - } - - if watcher.IsRunning() { - logger.Debugf("watcher=(%s) releasing watched=(%s)", watcher.ID(), pid.ID()) - if err := pid.Tell(ctx, watcher, terminated); err != nil { - return err + watcher := watcher + eg.Go(func() error { + terminated := &goaktpb.Terminated{ + ActorId: pid.ID(), } - watcher.UnWatch(pid) - logger.Debugf("watcher=(%s) released watched=(%s)", watcher.ID(), pid.ID()) - } + if watcher.IsRunning() { + logger.Debugf("watcher=(%s) releasing watched=(%s)", watcher.ID(), pid.ID()) + if err := pid.Tell(ctx, watcher, terminated); err != nil { + return err + } + + watcher.UnWatch(pid) + logger.Debugf("watcher=(%s) released watched=(%s)", watcher.ID(), pid.ID()) + } + return nil + }) + } + if err := eg.Wait(); err != nil { + logger.Errorf("watcher=(%s) failed to free all watcher error: %v", pid.ID(), err) + return err } logger.Debugf("%s successfully frees all watcher actors...", pid.ID()) return nil @@ -1248,17 +1258,26 @@ func (pid *PID) freeWatchees(ctx context.Context) error { logger.Debugf("%s freeing all watched actors...", pid.ID()) size := pid.watcheesMap.Size() if size > 0 { + eg, ctx := errgroup.WithContext(ctx) for _, watched := range pid.watcheesMap.List() { - logger.Debugf("watcher=(%s) unwatching actor=(%s)", pid.ID(), watched.ID()) - pid.UnWatch(watched) - if err := watched.Shutdown(ctx); err != nil { - errwrap := fmt.Errorf( - "watcher=(%s) failed to unwatch actor=(%s): %w", - pid.ID(), watched.ID(), err, - ) - return errwrap - } - logger.Debugf("watcher=(%s) successfully unwatch actor=(%s)", pid.ID(), watched.ID()) + watched := watched + eg.Go(func() error { + logger.Debugf("watcher=(%s) unwatching actor=(%s)", pid.ID(), watched.ID()) + pid.UnWatch(watched) + if err := watched.Shutdown(ctx); err != nil { + errwrap := fmt.Errorf( + "watcher=(%s) failed to unwatch actor=(%s): %w", + pid.ID(), watched.ID(), err, + ) + return errwrap + } + logger.Debugf("watcher=(%s) successfully unwatch actor=(%s)", pid.ID(), watched.ID()) + return nil + }) + } + if err := eg.Wait(); err != nil { + logger.Errorf("watcher=(%s) failed to unwatch actors: %v", pid.ID(), err) + return err } logger.Debugf("%s successfully unwatch all watched actors...", pid.ID()) return nil @@ -1273,18 +1292,27 @@ func (pid *PID) freeChildren(ctx context.Context) error { logger.Debugf("%s freeing all child actors...", pid.ID()) size := pid.childrenMap.Size() if size > 0 { + eg, ctx := errgroup.WithContext(ctx) for _, child := range pid.Children() { - logger.Debugf("parent=(%s) disowning child=(%s)", pid.ID(), child.ID()) - pid.UnWatch(child) - pid.childrenMap.Remove(child.Address()) - if err := child.Shutdown(ctx); err != nil { - errwrap := fmt.Errorf( - "parent=(%s) failed to disown child=(%s): %w", pid.ID(), child.ID(), - err, - ) - return errwrap - } - logger.Debugf("parent=(%s) successfully disown child=(%s)", pid.ID(), child.ID()) + child := child + eg.Go(func() error { + logger.Debugf("parent=(%s) disowning child=(%s)", pid.ID(), child.ID()) + pid.UnWatch(child) + pid.childrenMap.Remove(child.Address()) + if err := child.Shutdown(ctx); err != nil { + errwrap := fmt.Errorf( + "parent=(%s) failed to disown child=(%s): %w", pid.ID(), child.ID(), + err, + ) + return errwrap + } + logger.Debugf("parent=(%s) successfully disown child=(%s)", pid.ID(), child.ID()) + return nil + }) + } + if err := eg.Wait(); err != nil { + logger.Errorf("parent=(%s) failed to free all child actors: %v", pid.ID(), err) + return err } logger.Debugf("%s successfully free all child actors...", pid.ID()) return nil