Skip to content

Commit

Permalink
test: Add coverage for runner health monitor (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Dec 2, 2024
1 parent bdc4015 commit 4ce8065
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 12 deletions.
4 changes: 2 additions & 2 deletions internal/commands/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func (l *LaunchCommand) Execute(cfg *config.Config) error {
return fmt.Errorf("failed to start runner process: %w", err)
}

go http.MonitorRunnerHealth(ctx, cmd, env.RunnerServerURI, &wg)
go http.ManageRunnerHealth(ctx, cmd, env.RunnerServerURI, &wg)

err = cmd.Wait()
if err != nil && err.Error() == "signal: killed" {
logs.Warn("Unhealthy runner process was terminated")
logs.Warn("Unresponsive runner process was terminated")
} else if err != nil {
logs.Errorf("Runner process exited with error: %v", err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"
)

const (
var (
// healthCheckTimeout is the timeout (in seconds) for the launcher's health
// check request to the runner.
healthCheckTimeout = 5 * time.Second
Expand All @@ -28,6 +28,23 @@ const (
initialDelay = 3 * time.Second
)

// HealthStatus represents the possible states of runner health monitoring
type HealthStatus int

const (
// StatusHealthy indicates the runner is responding to health checks
StatusHealthy HealthStatus = iota
// StatusUnhealthy indicates the runner has failed too many health checks
StatusUnhealthy
// StatusMonitoringCancelled indicates monitoring was cancelled via context
StatusMonitoringCancelled
)

// healthCheckResult contains the result of health monitoring
type healthCheckResult struct {
Status HealthStatus
}

// sendRunnerHealthCheckRequest sends a request to the runner's health check endpoint.
// Returns `nil` if the health check succeeds, or an error if it fails.
func sendRunnerHealthCheckRequest(runnerServerURI string) error {
Expand All @@ -50,12 +67,18 @@ func sendRunnerHealthCheckRequest(runnerServerURI string) error {
return nil
}

// MonitorRunnerHealth regularly checks the runner's health status. If the
// health check fails more times than allowed, we terminate the runner process.
func MonitorRunnerHealth(ctx context.Context, cmd *exec.Cmd, runnerServerURI string, wg *sync.WaitGroup) {
func monitorRunnerHealth(
ctx context.Context,
runnerServerURI string,
wg *sync.WaitGroup,
) chan healthCheckResult {
logs.Debug("Started monitoring runner health")
resultChan := make(chan healthCheckResult, 1)

wg.Add(1)
go func() {
defer wg.Done()
defer close(resultChan)

time.Sleep(initialDelay)

Expand All @@ -67,24 +90,47 @@ func MonitorRunnerHealth(ctx context.Context, cmd *exec.Cmd, runnerServerURI str
select {
case <-ctx.Done():
logs.Debug("Stopped monitoring runner health")
resultChan <- healthCheckResult{Status: StatusMonitoringCancelled}
return

case <-ticker.C:
if err := sendRunnerHealthCheckRequest(runnerServerURI); err != nil {
failureCount++
logs.Warnf("Found runner unresponsive (%d/%d)", failureCount, healthCheckMaxFailures)
if failureCount >= healthCheckMaxFailures {
logs.Warn("Reached max failures on runner health check, terminating runner...")
if err := cmd.Process.Kill(); err != nil {
panic(fmt.Errorf("failed to terminate runner process: %v", err))
}
logs.Debug("Stopped monitoring runner health")
resultChan <- healthCheckResult{Status: StatusUnhealthy}
return
}
} else {
failureCount = 0
logs.Debug("Found runner healthy")
failureCount = 0
}
}
}
}()

return resultChan
}

// ManageRunnerHealth monitors runner health and terminates it if unhealthy.
func ManageRunnerHealth(
ctx context.Context,
cmd *exec.Cmd,
runnerServerURI string,
wg *sync.WaitGroup,
) {
resultChan := monitorRunnerHealth(ctx, runnerServerURI, wg)

go func() {
result := <-resultChan
switch result.Status {
case StatusUnhealthy:
logs.Warn("Found runner unresponsive too many times, terminating runner...")
if err := cmd.Process.Kill(); err != nil {
panic(fmt.Errorf("failed to terminate unhealthy runner process: %v", err))
}
case StatusMonitoringCancelled:
// On cancellation via context, CommandContext will terminate the process, so no action.
}
}()
}
215 changes: 215 additions & 0 deletions internal/http/manage_runner_health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package http

import (
"context"
"net/http"
"net/http/httptest"
"os/exec"
"sync"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func init() {
healthCheckTimeout = 20 * time.Millisecond
healthCheckInterval = 10 * time.Millisecond
initialDelay = 5 * time.Millisecond
healthCheckMaxFailures = 2
}

func TestSendRunnerHealthCheckRequest(t *testing.T) {
tests := []struct {
name string
serverResponse int
serverDelay time.Duration
expectError bool
}{
{
name: "successful health check",
serverResponse: http.StatusOK,
expectError: false,
},
{
name: "unhealthy response",
serverResponse: http.StatusServiceUnavailable,
expectError: true,
},
{
name: "timeout failure",
serverResponse: http.StatusOK,
serverDelay: healthCheckTimeout * 2,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
if tt.serverDelay > 0 {
time.Sleep(tt.serverDelay)
}
w.WriteHeader(tt.serverResponse)
}))
defer srv.Close()

err := sendRunnerHealthCheckRequest(srv.URL)

if tt.expectError {
assert.Error(t, err, "expected error but got nil")
} else {
assert.NoError(t, err, "unexpected error")
}
})
}
}

func TestMonitorRunnerHealth(t *testing.T) {
tests := []struct {
name string
serverFn http.HandlerFunc
expectedStatus HealthStatus
timeout time.Duration
}{
{
name: "healthy runner",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
},
expectedStatus: StatusMonitoringCancelled,
timeout: 200 * time.Millisecond,
},
{
name: "unhealthy runner",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
},
expectedStatus: StatusUnhealthy,
timeout: 500 * time.Millisecond,
},
{
name: "alternating health status",
serverFn: func() http.HandlerFunc {
isHealthy := true
return func(w http.ResponseWriter, _ *http.Request) {
if isHealthy {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
isHealthy = !isHealthy
}
}(),
expectedStatus: StatusMonitoringCancelled,
timeout: 200 * time.Millisecond,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(tt.serverFn)
defer srv.Close()

ctx, cancel := context.WithTimeout(context.Background(), tt.timeout)
defer cancel()

var wg sync.WaitGroup
resultChan := monitorRunnerHealth(ctx, srv.URL, &wg)

result := <-resultChan
assert.Equal(t, tt.expectedStatus, result.Status, "unexpected health status")

wg.Wait()
})
}
}

func TestManageRunnerHealth(t *testing.T) {
tests := []struct {
name string
serverFn http.HandlerFunc
expectKill bool
}{
{
name: "healthy runner not killed",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
},
expectKill: false,
},
{
name: "unhealthy runner killed",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
},
expectKill: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(tt.serverFn)
defer srv.Close()

cmd := exec.Command("sleep", "60")
require.NoError(t, cmd.Start(), "Failed to start long-running dummy process")

done := make(chan error) // to help monitor process state
go func() {
done <- cmd.Wait()
}()

var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

ManageRunnerHealth(ctx, cmd, srv.URL, &wg)

// For a healthy runner, we wait long enough for 3 health checks to pass.
// For an unhealthy runner, we wait long enough for 2 health checks to
// fail and then trigger kill. This sleep ensures we do not check runner
// health too early, i.e. before monitoring can detect unhealthy status.
time.Sleep(healthCheckInterval * time.Duration(healthCheckMaxFailures+1))

// check if monitored process was killed or kept as expected
select {
case <-done:
assert.True(t, tt.expectKill, "Process was killed but should have been left running")

case <-time.After(100 * time.Millisecond):
if tt.expectKill {
err := cmd.Process.Signal(syscall.Signal(0))
assert.Error(t, err, "Expected process to be killed but it was still running")
if err == nil {
assert.NoError(t, cmd.Process.Kill(), "Failed to kill process during cleanup")
}
}
}

wg.Wait()
})
}
}

func TestContextCancellation(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup

resultChan := monitorRunnerHealth(ctx, srv.URL, &wg)

time.Sleep(20 * time.Millisecond) // short-lived until context is cancelled
cancel()

result := <-resultChan
assert.Equal(t, StatusMonitoringCancelled, result.Status, "unexpected status after context cancellation")

wg.Wait()
}

0 comments on commit 4ce8065

Please sign in to comment.