Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions internal/extension/oop.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,17 @@ import (
const defaultUnixSocket = ".grpc.sock"

type OOPExtension struct {
name string
executable string
socket string
cmd *exec.Cmd
server *grpc.Server
service extpb.ExtensionServiceServer
logger logr.Logger
sync io.Writer
serverMu sync.Mutex
monitorWg sync.WaitGroup
name string
executable string
socket string
cmd *exec.Cmd
server *grpc.Server
service extpb.ExtensionServiceServer
logger logr.Logger
sync io.Writer
serverMu sync.Mutex
monitorWg sync.WaitGroup
completionWg sync.WaitGroup
}

func NewOOPExtension(name string, location string, service extpb.ExtensionServiceServer, logger logr.Logger, sync io.Writer) (OOPExtension, error) {
Expand Down Expand Up @@ -89,9 +90,10 @@ func (p *OOPExtension) Start() error {
return err
}

p.monitorWg.Add(1)
monitorReady := make(chan struct{})
go p.monitorStderr(stderr, monitorReady)
p.monitorWg.Go(func() {
p.monitorStderr(stderr, monitorReady)
})
<-monitorReady

if err = cmd.Start(); err != nil {
Expand All @@ -102,13 +104,14 @@ func (p *OOPExtension) Start() error {
}
p.logger.Info("started")

go func() {
p.completionWg.Go(func() {
// We must wait for stderr to be fully read before calling cmd.Wait()
p.monitorWg.Wait()

if e := cmd.Wait(); e != nil {
p.logger.Error(e, fmt.Sprintf("Extension %q finished with an error", p.name))
}
// wait for stderr
p.monitorWg.Wait()
}()
})

// only set this, if we successfully started it all
p.cmd = cmd
Expand All @@ -119,6 +122,10 @@ func (p *OOPExtension) IsAlive() bool {
return p.cmd != nil && p.cmd.Process.Signal(syscall.Signal(0)) == nil
}

func (p *OOPExtension) WaitForCompletion() {
p.completionWg.Wait()
}

func (p *OOPExtension) Stop() error {
p.logger.Info("stopping...")
var err error
Expand Down Expand Up @@ -218,7 +225,6 @@ func (p *OOPExtension) cleanupSocket() error {
}

func (p *OOPExtension) monitorStderr(stderr io.ReadCloser, monitorReady chan struct{}) {
defer p.monitorWg.Done()
defer stderr.Close()

scanner := bufio.NewScanner(stderr)
Expand Down
4 changes: 2 additions & 2 deletions internal/extension/oop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func TestOOPExtensionForwardsLog(t *testing.T) {
for oopErrorLog.cmd.ProcessState == nil {
time.Sleep(5 * time.Millisecond) // wait for the command to return
}
// try wait for ps to finish writing its output
time.Sleep(50 * time.Millisecond)

oopErrorLog.WaitForCompletion()

_ = oopErrorLog.Stop()

Expand Down
Loading