Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 39 additions & 35 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func newSupervisor(t *testing.T, configType string, extraConfigData map[string]s
logger, err := zap.NewDevelopment()
require.NoError(t, err)

s, err := supervisor.NewSupervisor(logger, cfg)
s, err := supervisor.NewSupervisor(t.Context(), logger, cfg)
require.NoError(t, err)

return s, &cfg
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) {
require.True(t, supervisorCfg.Agent.UseHUPConfigReload)
}

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestSupervisorStartsCollectorWithLocalConfigOnly(t *testing.T) {
require.True(t, supervisorCfg.Agent.UseHUPConfigReload)
}
t.Cleanup(s.Shutdown)
require.NoError(t, s.Start())
require.NoError(t, s.Start(t.Context()))

waitForSupervisorConnection(server.supervisorConnected, true)
require.True(t, connected.Load(), "Supervisor failed to connect")
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestSupervisorStartsCollectorWithNoPipelineConfig(t *testing.T) {
require.True(t, supervisorCfg.Agent.UseHUPConfigReload)
}
t.Cleanup(s.Shutdown)
require.NoError(t, s.Start())
require.NoError(t, s.Start(t.Context()))

waitForSupervisorConnection(server.supervisorConnected, true)
require.True(t, connected.Load(), "Supervisor failed to connect")
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestSupervisorStartsCollectorWithNoOpAMPServerWithNoLastRemoteConfig(t *tes
require.True(t, supervisorCfg.Agent.UseHUPConfigReload)
}
t.Cleanup(s.Shutdown)
require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))

// Verify the collector runs eventually by pinging the healthcheck extension
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -554,7 +554,7 @@ func TestSupervisorStartsCollectorWithNoOpAMPServerUsingLastRemoteConfig(t *test
require.True(t, supervisorCfg.Agent.UseHUPConfigReload)
}

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

// Verify the collector runs eventually by pinging the healthcheck extension
Expand Down Expand Up @@ -640,7 +640,7 @@ func TestSupervisorStartsCollectorWithRemoteConfigAndExecParams(t *testing.T) {
"healthcheckPort": strconv.Itoa(secondHealthcheckPort),
})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -702,7 +702,7 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {
"healthcheck_port": "12345",
})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

// Verify the collector is not running after 250 ms by checking the healthcheck endpoint
Expand Down Expand Up @@ -794,7 +794,7 @@ func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) {
require.True(t, supervisorCfg.Agent.UseHUPConfigReload)
}

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -874,7 +874,7 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) {

s, _ := newSupervisor(t, "nocap", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -965,7 +965,7 @@ func TestSupervisorBootstrapsCollector(t *testing.T) {

s, _ := newSupervisor(t, "nocap", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1051,7 +1051,7 @@ func TestSupervisorBootstrapsCollectorAvailableComponents(t *testing.T) {

s, _ := newSupervisor(t, "reports_available_components", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1114,7 +1114,7 @@ func TestSupervisorReportsEffectiveConfig(t *testing.T) {

s, _ := newSupervisor(t, "basic", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1225,7 +1225,7 @@ func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) {

s, _ := newSupervisor(t, "agent_description", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1421,7 +1421,7 @@ func TestSupervisorRestartCommand(t *testing.T) {
require.True(t, supervisorCfg.Agent.UseHUPConfigReload)
}

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1498,7 +1498,7 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) {

s, _ := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(initialServer.supervisorConnected, true)
Expand Down Expand Up @@ -1550,7 +1550,7 @@ func TestSupervisorOpAMPWithHTTPEndpoint(t *testing.T) {

s, _ := newSupervisor(t, "http", map[string]string{"url": initialServer.addr})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(initialServer.supervisorConnected, true)
Expand Down Expand Up @@ -1595,7 +1595,7 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
require.True(t, supervisorCfg.Agent.UseHUPConfigReload)
}

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))

waitForSupervisorConnection(initialServer.supervisorConnected, true)

Expand Down Expand Up @@ -1644,7 +1644,7 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
require.True(t, supervisorCfg.Agent.UseHUPConfigReload)
}

require.Nil(t, s1.Start())
require.Nil(t, s1.Start(t.Context()))
defer s1.Shutdown()

waitForSupervisorConnection(newServer.supervisorConnected, true)
Expand Down Expand Up @@ -1691,7 +1691,7 @@ func TestSupervisorPersistsInstanceID(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))

waitForSupervisorConnection(server.supervisorConnected, true)

Expand Down Expand Up @@ -1723,7 +1723,7 @@ func TestSupervisorPersistsInstanceID(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1776,7 +1776,7 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))

waitForSupervisorConnection(server.supervisorConnected, true)

Expand Down Expand Up @@ -1806,7 +1806,7 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1838,7 +1838,7 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))

waitForSupervisorConnection(server.supervisorConnected, true)

Expand Down Expand Up @@ -1885,7 +1885,7 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
"url": server.addr,
})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -2012,9 +2012,9 @@ func TestSupervisorLogging(t *testing.T) {
logger, err := telemetry.NewLogger(cfg.Telemetry.Logs)
require.NoError(t, err)

s, err := supervisor.NewSupervisor(logger, cfg)
s, err := supervisor.NewSupervisor(t.Context(), logger, cfg)
require.NoError(t, err)
require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))

// Start the server and wait for the supervisor to connect
server.start()
Expand Down Expand Up @@ -2098,7 +2098,7 @@ func TestSupervisorRemoteConfigApplyStatus(t *testing.T) {
if mode.UseHUPConfigReload {
require.True(t, supervisorCfg.Agent.UseHUPConfigReload)
}
require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -2249,7 +2249,7 @@ func TestSupervisorOpAmpServerPort(t *testing.T) {

s, _ := newSupervisor(t, "server_port", map[string]string{"url": server.addr, "supervisor_opamp_server_port": fmt.Sprintf("%d", supervisorOpAmpServerPort)})

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -2311,9 +2311,9 @@ func TestSupervisorHealthCheckServer(t *testing.T) {
logger, err := telemetry.NewLogger(cfg.Telemetry.Logs)
require.NoError(t, err)

s, err := supervisor.NewSupervisor(logger, cfg)
s, err := supervisor.NewSupervisor(t.Context(), logger, cfg)
require.NoError(t, err)
require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()
waitForSupervisorConnection(server.supervisorConnected, true)

Expand All @@ -2337,8 +2337,12 @@ func TestSupervisorHealthCheckServerBackendConnError(t *testing.T) {
healthcheckPort, err := findRandomPort()
require.NoError(t, err)

// Find an open port on the host that has no server listening on it.
badOpAMPServerPort, err := findRandomPort()
require.NoError(t, err)

cfgFile := getSupervisorConfig(t, "healthcheck", map[string]string{
"url": "badserver:8080",
"url": fmt.Sprintf("localhost:%d", badOpAMPServerPort),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one key fix: the DNS lookup for an unknown host name apparently cannot be cancelled, and therefore was producing the leaking goroutine. I couldn't figure out how to fix that issue (and it likely shouldn't be a problem when running the Supervisor as a process), so instead I opted to just avoid it.

"endpoint": fmt.Sprintf("localhost:%d", healthcheckPort),
})

Expand All @@ -2347,9 +2351,9 @@ func TestSupervisorHealthCheckServerBackendConnError(t *testing.T) {
logger, err := telemetry.NewLogger(cfg.Telemetry.Logs)
require.NoError(t, err)

s, err := supervisor.NewSupervisor(logger, cfg)
s, err := supervisor.NewSupervisor(t.Context(), logger, cfg)
require.NoError(t, err)
require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

// Wait for the health check server to start
Expand Down Expand Up @@ -2442,7 +2446,7 @@ func TestSupervisorEmitBootstrapTelemetry(t *testing.T) {
},
)

require.Nil(t, s.Start())
require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down
8 changes: 6 additions & 2 deletions cmd/opampsupervisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package main

import (
"context"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -35,12 +36,15 @@ func runInteractive() error {
return fmt.Errorf("failed to create logger: %w", err)
}

supervisor, err := supervisor.NewSupervisor(logger.Named("supervisor"), cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

supervisor, err := supervisor.NewSupervisor(ctx, logger.Named("supervisor"), cfg)
if err != nil {
return fmt.Errorf("failed to create supervisor: %w", err)
}

err = supervisor.Start()
err = supervisor.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start supervisor: %w", err)
}
Expand Down
Loading
Loading