Skip to content

Commit

Permalink
improve signal handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fridrik01 committed Jan 26, 2025
1 parent 6d740dc commit 46e8387
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 64 deletions.
81 changes: 25 additions & 56 deletions node-core/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/berachain/beacon-kit/log"
service "github.com/berachain/beacon-kit/node-core/services/registry"
"github.com/berachain/beacon-kit/node-core/types"
"golang.org/x/sync/errgroup"
)

// Compile-time assertion that node implements the NodeI interface.
Expand Down Expand Up @@ -61,62 +60,32 @@ func (n *node) Start(
// Make the context cancellable.
cctx, cancelFn := context.WithCancel(ctx)

// Create an errgroup to manage the lifecycle of all the services.
g, gctx := errgroup.WithContext(cctx)

// listen for quit signals so the calling parent process can gracefully exit
n.listenForQuitSignals(g, true, cancelFn)

// Start all the registered services.
if err := n.registry.StartAll(gctx); err != nil {
// Make sure the services that were successfully started are stopped
// before exiting. We assume that it is safe to call Stop on a
// service that was never started so we can call StopAll here
n.registry.StopAll()
return err
}

// Wait for those aforementioned exit signals.
err := g.Wait()
if err != nil {
return err
}

// Stopp each service allowing them the exit gracefully.
n.registry.StopAll()

return nil
}

// listenForQuitSignals listens for SIGINT and SIGTERM. When a signal is
// received,
// the cleanup function is called, indicating the caller can gracefully exit or
// return.
//
// Note, the blocking behavior of this depends on the block argument.
// The caller must ensure the corresponding context derived from the cancelFn is
// used correctly.
func (n *node) listenForQuitSignals(
g *errgroup.Group,
block bool,
cancelFn context.CancelFunc,
) {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

f := func() {
sig := <-sigCh
cancelFn()
stop := make(chan struct{})

go func() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigc)
sig := <-sigc
n.logger.Info("caught exit signal", "signal", sig.String())
}
cancelFn()
go func() {
n.registry.StopAll()
close(stop)
}()
for i := 10; i > 0; i-- {
<-sigc
if i > 1 {
n.logger.Info("Already shutting down, interrupt more to panic")
}
}
panic("Panic closing the beacon node")
}()

n.registry.StartAll(cctx)

// Wait for stop channel to be closed.
<-stop

if block {
g.Go(func() error {
f()
return nil
})
} else {
go f()
}
return nil
}
30 changes: 24 additions & 6 deletions node-core/services/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package service
import (
"context"
"reflect"
"sync"

"github.com/berachain/beacon-kit/log"
)
Expand Down Expand Up @@ -50,15 +51,20 @@ type Registry struct {
logger log.Logger
// services is a map of service type -> service instance.
services map[string]Basic
// servicesStarted is a map of services we have called Start() on.
servicesStarted map[string]struct{}
// serviceTypes is an ordered slice of registered service types.
serviceTypes []string
// mutex makes calls to StartAll and StopAll thread-safe.
mutex sync.Mutex
}

// NewRegistry starts a registry instance for convenience.
func NewRegistry(logger log.Logger, opts ...RegistryOption) *Registry {
r := &Registry{
logger: logger,
services: make(map[string]Basic),
logger: logger,
services: make(map[string]Basic),
servicesStarted: make(map[string]struct{}),
}

for _, opt := range opts {
Expand All @@ -70,7 +76,10 @@ func NewRegistry(logger log.Logger, opts ...RegistryOption) *Registry {
}

// StartAll initialized each service in order of registration.
func (s *Registry) StartAll(ctx context.Context) error {
func (s *Registry) StartAll(ctx context.Context) {
s.mutex.Lock()
defer s.mutex.Unlock()

// start all services
s.logger.Info("Starting services", "num", len(s.serviceTypes))
for _, typeName := range s.serviceTypes {
Expand All @@ -81,19 +90,28 @@ func (s *Registry) StartAll(ctx context.Context) error {
continue
}

s.servicesStarted[typeName] = struct{}{}
if err := svc.Start(ctx); err != nil {
return err
s.logger.Error("error when starting service", "type", typeName, "err", err)
return
}
}
return nil
}

func (s *Registry) StopAll() {
s.logger.Info("Stopping services", "num", len(s.serviceTypes))
s.mutex.Lock()
defer s.mutex.Unlock()

// stop all services in reverse order they were started
s.logger.Info("Stopping services", "num", len(s.serviceTypes))
for i := len(s.serviceTypes) - 1; i >= 0; i-- {
typeName := s.serviceTypes[i]

if _, started := s.servicesStarted[typeName]; !started {
s.logger.Info("Service not started", "type", typeName)
continue
}

s.logger.Info("Stopping service", "type", typeName)
svc := s.services[typeName]
if svc == nil {
Expand Down
3 changes: 1 addition & 2 deletions node-core/services/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
service "github.com/berachain/beacon-kit/node-core/services/registry"
"github.com/berachain/beacon-kit/node-core/services/registry/mocks"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestRegistry_StartAll(t *testing.T) {
Expand All @@ -55,7 +54,7 @@ func TestRegistry_StartAll(t *testing.T) {
t.Fatalf("Failed to register Service2: %v", err)
}

require.NoError(t, registry.StartAll(context.Background()))
registry.StartAll(context.Background())
time.Sleep(25 * time.Millisecond)

service1.AssertCalled(t, "Start", mock.Anything)
Expand Down

0 comments on commit 46e8387

Please sign in to comment.