diff --git a/internal/commands/launch.go b/internal/commands/launch.go index 20e084b..03e94d8 100644 --- a/internal/commands/launch.go +++ b/internal/commands/launch.go @@ -103,11 +103,11 @@ func (l *LaunchCommand) Execute(cfg *config.Config) error { return fmt.Errorf("failed to start runner process: %w", err) } - go http.MonitorRunnerHealth(ctx, cmd, env.RunnerServerURI, &wg) + go http.ManageRunnerHealth(ctx, cmd, env.RunnerServerURI, &wg) err = cmd.Wait() if err != nil && err.Error() == "signal: killed" { - logs.Warn("Unhealthy runner process was terminated") + logs.Warn("Unresponsive runner process was terminated") } else if err != nil { logs.Errorf("Runner process exited with error: %v", err) } else { diff --git a/internal/http/monitor_runner_health.go b/internal/http/manage_runner_health.go similarity index 56% rename from internal/http/monitor_runner_health.go rename to internal/http/manage_runner_health.go index 3546b7e..c24d5da 100644 --- a/internal/http/monitor_runner_health.go +++ b/internal/http/manage_runner_health.go @@ -10,7 +10,7 @@ import ( "time" ) -const ( +var ( // healthCheckTimeout is the timeout (in seconds) for the launcher's health // check request to the runner. healthCheckTimeout = 5 * time.Second @@ -28,6 +28,23 @@ const ( initialDelay = 3 * time.Second ) +// HealthStatus represents the possible states of runner health monitoring +type HealthStatus int + +const ( + // StatusHealthy indicates the runner is responding to health checks + StatusHealthy HealthStatus = iota + // StatusUnhealthy indicates the runner has failed too many health checks + StatusUnhealthy + // StatusMonitoringCancelled indicates monitoring was cancelled via context + StatusMonitoringCancelled +) + +// healthCheckResult contains the result of health monitoring +type healthCheckResult struct { + Status HealthStatus +} + // sendRunnerHealthCheckRequest sends a request to the runner's health check endpoint. // Returns `nil` if the health check succeeds, or an error if it fails. func sendRunnerHealthCheckRequest(runnerServerURI string) error { @@ -50,12 +67,18 @@ func sendRunnerHealthCheckRequest(runnerServerURI string) error { return nil } -// MonitorRunnerHealth regularly checks the runner's health status. If the -// health check fails more times than allowed, we terminate the runner process. -func MonitorRunnerHealth(ctx context.Context, cmd *exec.Cmd, runnerServerURI string, wg *sync.WaitGroup) { +func monitorRunnerHealth( + ctx context.Context, + runnerServerURI string, + wg *sync.WaitGroup, +) chan healthCheckResult { + logs.Debug("Started monitoring runner health") + resultChan := make(chan healthCheckResult, 1) + wg.Add(1) go func() { defer wg.Done() + defer close(resultChan) time.Sleep(initialDelay) @@ -67,24 +90,47 @@ func MonitorRunnerHealth(ctx context.Context, cmd *exec.Cmd, runnerServerURI str select { case <-ctx.Done(): logs.Debug("Stopped monitoring runner health") + resultChan <- healthCheckResult{Status: StatusMonitoringCancelled} return + case <-ticker.C: if err := sendRunnerHealthCheckRequest(runnerServerURI); err != nil { failureCount++ logs.Warnf("Found runner unresponsive (%d/%d)", failureCount, healthCheckMaxFailures) if failureCount >= healthCheckMaxFailures { - logs.Warn("Reached max failures on runner health check, terminating runner...") - if err := cmd.Process.Kill(); err != nil { - panic(fmt.Errorf("failed to terminate runner process: %v", err)) - } - logs.Debug("Stopped monitoring runner health") + resultChan <- healthCheckResult{Status: StatusUnhealthy} return } } else { - failureCount = 0 logs.Debug("Found runner healthy") + failureCount = 0 } } } }() + + return resultChan +} + +// ManageRunnerHealth monitors runner health and terminates it if unhealthy. +func ManageRunnerHealth( + ctx context.Context, + cmd *exec.Cmd, + runnerServerURI string, + wg *sync.WaitGroup, +) { + resultChan := monitorRunnerHealth(ctx, runnerServerURI, wg) + + go func() { + result := <-resultChan + switch result.Status { + case StatusUnhealthy: + logs.Warn("Found runner unresponsive too many times, terminating runner...") + if err := cmd.Process.Kill(); err != nil { + panic(fmt.Errorf("failed to terminate unhealthy runner process: %v", err)) + } + case StatusMonitoringCancelled: + // On cancellation via context, CommandContext will terminate the process, so no action. + } + }() } diff --git a/internal/http/manage_runner_health_test.go b/internal/http/manage_runner_health_test.go new file mode 100644 index 0000000..3e26f2d --- /dev/null +++ b/internal/http/manage_runner_health_test.go @@ -0,0 +1,215 @@ +package http + +import ( + "context" + "net/http" + "net/http/httptest" + "os/exec" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func init() { + healthCheckTimeout = 20 * time.Millisecond + healthCheckInterval = 10 * time.Millisecond + initialDelay = 5 * time.Millisecond + healthCheckMaxFailures = 2 +} + +func TestSendRunnerHealthCheckRequest(t *testing.T) { + tests := []struct { + name string + serverResponse int + serverDelay time.Duration + expectError bool + }{ + { + name: "successful health check", + serverResponse: http.StatusOK, + expectError: false, + }, + { + name: "unhealthy response", + serverResponse: http.StatusServiceUnavailable, + expectError: true, + }, + { + name: "timeout failure", + serverResponse: http.StatusOK, + serverDelay: healthCheckTimeout * 2, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + if tt.serverDelay > 0 { + time.Sleep(tt.serverDelay) + } + w.WriteHeader(tt.serverResponse) + })) + defer srv.Close() + + err := sendRunnerHealthCheckRequest(srv.URL) + + if tt.expectError { + assert.Error(t, err, "expected error but got nil") + } else { + assert.NoError(t, err, "unexpected error") + } + }) + } +} + +func TestMonitorRunnerHealth(t *testing.T) { + tests := []struct { + name string + serverFn http.HandlerFunc + expectedStatus HealthStatus + timeout time.Duration + }{ + { + name: "healthy runner", + serverFn: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }, + expectedStatus: StatusMonitoringCancelled, + timeout: 200 * time.Millisecond, + }, + { + name: "unhealthy runner", + serverFn: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + }, + expectedStatus: StatusUnhealthy, + timeout: 500 * time.Millisecond, + }, + { + name: "alternating health status", + serverFn: func() http.HandlerFunc { + isHealthy := true + return func(w http.ResponseWriter, _ *http.Request) { + if isHealthy { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } + isHealthy = !isHealthy + } + }(), + expectedStatus: StatusMonitoringCancelled, + timeout: 200 * time.Millisecond, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := httptest.NewServer(tt.serverFn) + defer srv.Close() + + ctx, cancel := context.WithTimeout(context.Background(), tt.timeout) + defer cancel() + + var wg sync.WaitGroup + resultChan := monitorRunnerHealth(ctx, srv.URL, &wg) + + result := <-resultChan + assert.Equal(t, tt.expectedStatus, result.Status, "unexpected health status") + + wg.Wait() + }) + } +} + +func TestManageRunnerHealth(t *testing.T) { + tests := []struct { + name string + serverFn http.HandlerFunc + expectKill bool + }{ + { + name: "healthy runner not killed", + serverFn: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }, + expectKill: false, + }, + { + name: "unhealthy runner killed", + serverFn: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + }, + expectKill: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := httptest.NewServer(tt.serverFn) + defer srv.Close() + + cmd := exec.Command("sleep", "60") + require.NoError(t, cmd.Start(), "Failed to start long-running dummy process") + + done := make(chan error) // to help monitor process state + go func() { + done <- cmd.Wait() + }() + + var wg sync.WaitGroup + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + ManageRunnerHealth(ctx, cmd, srv.URL, &wg) + + // For a healthy runner, we wait long enough for 3 health checks to pass. + // For an unhealthy runner, we wait long enough for 2 health checks to + // fail and then trigger kill. This sleep ensures we do not check runner + // health too early, i.e. before monitoring can detect unhealthy status. + time.Sleep(healthCheckInterval * time.Duration(healthCheckMaxFailures+1)) + + // check if monitored process was killed or kept as expected + select { + case <-done: + assert.True(t, tt.expectKill, "Process was killed but should have been left running") + + case <-time.After(100 * time.Millisecond): + if tt.expectKill { + err := cmd.Process.Signal(syscall.Signal(0)) + assert.Error(t, err, "Expected process to be killed but it was still running") + if err == nil { + assert.NoError(t, cmd.Process.Kill(), "Failed to kill process during cleanup") + } + } + } + + wg.Wait() + }) + } +} + +func TestContextCancellation(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + resultChan := monitorRunnerHealth(ctx, srv.URL, &wg) + + time.Sleep(20 * time.Millisecond) // short-lived until context is cancelled + cancel() + + result := <-resultChan + assert.Equal(t, StatusMonitoringCancelled, result.Status, "unexpected status after context cancellation") + + wg.Wait() +}