Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
Ensure tombstones created before kubexit started are read
Browse files Browse the repository at this point in the history
This commit changes the `Watch` function inside the `tombstone` package to
also emit an initial event besides the `fsnotify` events. This initial event
is called immediatly when `Watch` is called and the watcher has been setup.

This change allows kubexit to detect tombstones written before kubexit was
started. This prevents possible race conditions as described by #8.

In order for this change to work, the `tombstone.EventHandler` type was
changed. It now requires a function with 3 arguments: The graveyard, the
tombstone and the operation instead of an `fsnotify.Event`. Reason being
that the initial event is not an `fsnotify.Event`. The functions implementing
an `tombstone.EventHandler` are changed accordingly.

This change on its own introduces a new bug where the tombstone is written
as part of an initial event, but the child process will still start because
`child.Start()` is being called after the watcher has been setup. To overcome
this issue, the shutdown state of the child is tracked in a new flag, which is
set if `ShutdownNow()` or `ShutdownWithTimeout()` is executed.
  • Loading branch information
RemcodM authored and stephpalis committed Jul 12, 2021
1 parent 179bf8b commit 7868360
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 24 deletions.
8 changes: 3 additions & 5 deletions cmd/kubexit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,11 @@ func onDeathOfAny(deathDeps []string, callback func()) tombstone.EventHandler {
deathDepSet[depName] = struct{}{}
}

return func(event fsnotify.Event) {
if event.Op&fsnotify.Create != fsnotify.Create && event.Op&fsnotify.Write != fsnotify.Write {
// ignore other events
return func(graveyard string, name string, op fsnotify.Op) {
if op != 0 && op&fsnotify.Create != fsnotify.Create && op&fsnotify.Write != fsnotify.Write {
// ignore events other than initial, create and write
return
}
graveyard := filepath.Dir(event.Name)
name := filepath.Base(event.Name)

log.Info("Tombstone modified:", "name", name)
if _, ok := deathDepSet[name]; !ok {
Expand Down
10 changes: 10 additions & 0 deletions pkg/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Supervisor struct {
cmd *exec.Cmd
sigCh chan os.Signal
startStopLock sync.Mutex
shutdown bool
shutdownTimer *time.Timer
}

Expand All @@ -32,13 +33,18 @@ func New(name string, args ...string) *Supervisor {
cmd.Env = os.Environ()
return &Supervisor{
cmd: cmd,
shutdown: false,
}
}

func (s *Supervisor) Start() error {
s.startStopLock.Lock()
defer s.startStopLock.Unlock()

if s.shutdown {
return errors.New("not starting child process: shutdown already started")
}

log.Printf("Starting: %s\n", s)
if err := s.cmd.Start(); err != nil {
return fmt.Errorf("failed to start child process: %v", err)
Expand Down Expand Up @@ -90,6 +96,8 @@ func (s *Supervisor) ShutdownNow() error {
s.startStopLock.Lock()
defer s.startStopLock.Unlock()

s.shutdown = true

if !s.isRunning() {
log.Println("Skipping ShutdownNow: child process not running")
return nil
Expand All @@ -109,6 +117,8 @@ func (s *Supervisor) ShutdownWithTimeout(timeout time.Duration) error {
s.startStopLock.Lock()
defer s.startStopLock.Unlock()

s.shutdown = true

if !s.isRunning() {
log.Println("Skipping ShutdownWithTimeout: child process not running")
return nil
Expand Down
58 changes: 39 additions & 19 deletions pkg/tombstone/tombstone.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"sync"
"time"

"github.com/karlkfi/kubexit/pkg/log"
kubelog "github.com/karlkfi/kubexit/pkg/log"

"github.com/fsnotify/fsnotify"
"sigs.k8s.io/yaml"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (t *Tombstone) RecordBirth() error {
born := time.Now()
t.Born = &born

log.Info("Creating tombstone:", "path", t.Path())
kubelog.Info("Creating tombstone:", "path", t.Path())
err := t.Write()
if err != nil {
return fmt.Errorf("failed to create tombstone: %v", err)
Expand All @@ -76,7 +77,7 @@ func (t *Tombstone) RecordDeath(exitCode int) error {
t.Died = &died
t.ExitCode = &code

log.Info("Updating tombstone:", "path", t.Path())
kubelog.Info("Updating tombstone:", "path", t.Path())
err := t.Write()
if err != nil {
return fmt.Errorf("failed to update tombstone: %v", err)
Expand All @@ -87,7 +88,7 @@ func (t *Tombstone) RecordDeath(exitCode int) error {
func (t *Tombstone) String() string {
inline, err := json.Marshal(t)
if err != nil {
log.Error(err, "Error: failed to marshal tombstone as json")
kubelog.Error(err, "Error: failed to marshal tombstone as json")
return "{}"
}
return string(inline)
Expand All @@ -113,24 +114,24 @@ func Read(graveyard, name string) (*Tombstone, error) {
return &t, nil
}

type EventHandler func(fsnotify.Event)
type EventHandler func(string, string, fsnotify.Op)

// LoggingEventHandler is an example EventHandler that logs fsnotify events
func LoggingEventHandler(event fsnotify.Event) {
if event.Op&fsnotify.Create == fsnotify.Create {
log.Info("Tombstone Watch: file created:", "name", event.Name)
func LoggingEventHandler(graveyard string, tombstone string, op fsnotify.Op) {
if op&fsnotify.Create == fsnotify.Create {
log.Printf("Tombstone Watch: file created: %s/%s\n", graveyard, tombstone)
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
log.Info("Tombstone Watch: file removed:", "name", event.Name)
if op&fsnotify.Remove == fsnotify.Remove {
log.Printf("Tombstone Watch: file removed: %s/%s\n", graveyard, tombstone)
}
if event.Op&fsnotify.Write == fsnotify.Write {
log.Info("Tombstone Watch: file modified:", "name", event.Name)
if op&fsnotify.Write == fsnotify.Write {
log.Printf("Tombstone Watch: file modified: %s/%s\n", graveyard, tombstone)
}
if event.Op&fsnotify.Rename == fsnotify.Rename {
log.Info("Tombstone Watch: file renamed:", "name", event.Name)
if op&fsnotify.Rename == fsnotify.Rename {
log.Printf("Tombstone Watch: file renamed: %s/%s\n", graveyard, tombstone)
}
if event.Op&fsnotify.Chmod == fsnotify.Chmod {
log.Info("Tombstone Watch: file chmoded:", "name", event.Name)
if op&fsnotify.Chmod == fsnotify.Chmod {
log.Printf("Tombstone Watch: file chmoded: %s/%s\n", graveyard, tombstone)
}
}

Expand All @@ -147,18 +148,20 @@ func Watch(ctx context.Context, graveyard string, eventHandler EventHandler) err
for {
select {
case <-ctx.Done():
log.Info("Tombstone Watch: done", "graveyard", graveyard)
kubelog.Info("Tombstone Watch: done", "graveyard", graveyard)
return
case event, ok := <-watcher.Events:
if !ok {
return
}
eventHandler(event)
graveyard := filepath.Dir(event.Name)
tombstone := filepath.Base(event.Name)
eventHandler(graveyard, tombstone, event.Op)
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Error(err, "Tombstone Watch: error", "graveyard", graveyard)
kubelog.Error(err, "Tombstone Watch: error", "graveyard", graveyard)
// TODO: wrap ctx with WithCancel and cancel on terminal errors, if any
}
}
Expand All @@ -168,5 +171,22 @@ func Watch(ctx context.Context, graveyard string, eventHandler EventHandler) err
if err != nil {
return fmt.Errorf("failed to add watcher: %v", err)
}

// fire initial events after we started watching, this way no events are ever missed
f, err := os.Open(graveyard)
if err != nil {
return fmt.Errorf("failed to watch graveyard: %v", err)
}

files, err := f.Readdir(-1)
f.Close()
if err != nil {
return fmt.Errorf("failed to watch for initial tombstones: %v", err)
}

for _, file := range files {
eventHandler(graveyard, file.Name(), 0)
}

return nil
}

0 comments on commit 7868360

Please sign in to comment.