Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lxd watchdog #677

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .snapcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ parts:
go-packages:
- github.com/travis-ci/worker/cmd/travis-worker
go-importpath: github.com/travis-ci/worker
go-channel: 1.21/stable
go-channel: 1.23/stable
prime:
- bin/travis-worker
override-build: |-
Expand Down
4 changes: 3 additions & 1 deletion backend/lxd.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,9 @@ func (p *lxdProvider) allocateAddress(containerName string) (string, error) {
var ips []string
ip := net.ParseIP(p.networkGateway)
for ip := ip.Mask(p.networkSubnet.Mask); p.networkSubnet.Contains(ip); inc(ip) {
ips = append(ips, ip.String())
if ip[3] < 230 { // reserving some IPs for watchdog
ips = append(ips, ip.String())
}
}

usedIPs := []string{}
Expand Down
30 changes: 29 additions & 1 deletion cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type CLI struct {

heartbeatErrSleep time.Duration
heartbeatSleep time.Duration

lastPoolSize int
}

// NewCLI creates a new *CLI from a *cli.Context
Expand Down Expand Up @@ -145,6 +147,17 @@ func (i *CLI) Setup() (bool, error) {

i.setupSentry()
i.setupMetrics()
if i.Config.ProviderName == "lxd" { // run watchdog once to check if containers start and get network connection - exits if not
RunLXDWatchdog(false)

if i.c.Bool("watchdog") {
os.Exit(0) // don't proceed if running with '-watchdog' param
}
}

if i.Config.ProviderName == "lxd" {
RunLXDWatchdog(true) // start the ldx watchdog loop
}

err := i.setupOpenCensus(ctx)
if err != nil {
Expand Down Expand Up @@ -550,7 +563,7 @@ func (i *CLI) signalHandler() {
signal.Notify(signalChan,
syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1,
syscall.SIGTTIN, syscall.SIGTTOU,
syscall.SIGUSR2)
syscall.SIGUSR2, syscall.Signal(0x22), syscall.Signal(0x23))

for {
select {
Expand All @@ -569,8 +582,23 @@ func (i *CLI) signalHandler() {
i.logger.Info("SIGTTOU received, removing processor from pool")
i.ProcessorPool.Decr()
case syscall.SIGUSR2:
i.lastPoolSize = i.ProcessorPool.Size()
i.logger.Warn("SIGUSR2 received, toggling graceful shutdown and pause")
i.ProcessorPool.GracefulShutdown(true)
case syscall.Signal(0x22): //SIGRTMIN
i.lastPoolSize = i.ProcessorPool.Size()
i.logger.Warn("SIGRTMIN received, pause processing")
i.ProcessorPool.SetSize(0)
case syscall.Signal(0x23): //SIGRTMIN + 1
i.logProcessorInfo("received SIGRTMIN+1, resuming processor pool")
if i.lastPoolSize == 0 {
if i.ProcessorPool.Size() > 0 {
i.lastPoolSize = i.ProcessorPool.Size()
} else {
i.lastPoolSize = 1
}
}
i.ProcessorPool.SetSize(i.lastPoolSize)
case syscall.SIGUSR1:
i.logProcessorInfo("received SIGUSR1")
default:
Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,13 @@ var (
Usage: "sample rate for trace as an inverse fraction - for sample rate n, every nth event will be sampled",
Value: 1,
}),
NewConfigDef("watchdog", &cli.BoolFlag{
Usage: "execute LXD watchdog and exit",
}),
NewConfigDef("MaxRequeues", &cli.IntFlag{
Usage: "Max requeue count after Worker pauses",
Value: 0,
}),
}

// Flags is the list of all CLI flags accepted by travis-worker
Expand Down Expand Up @@ -443,6 +450,7 @@ type Config struct {
StackdriverProjectID string `config:"stackdriver-project-id"`
OpencensusTracingEnabled bool `config:"opencensus-tracing-enabled"`
OpencensusSamplingRate int `config:"opencensus-sampling-rate"`
MaxRequeues int `config:"max-requeues"`

ProviderConfig *ProviderConfig
}
Expand Down
7 changes: 7 additions & 0 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ func (p *Processor) process(ctx gocontext.Context, buildJob Job) {
}
if buildJob.Requeued() {
fields["requeued"] = 1

requeueCount, _ := state.Get("requeueCount").(int)

if p.config.MaxRequeues > 0 && requeueCount > p.config.MaxRequeues {
logger.WithFields(fields).Info("too many requeues, shutting down")
p.GracefulShutdown()
}
}
logger.WithFields(fields).Info("finished job")

Expand Down
39 changes: 35 additions & 4 deletions remote_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (

// RemoteController provides an HTTP API for controlling worker.
type RemoteController struct {
pool *ProcessorPool
auth string
workerInfo func() workerInfo
cancel func()
pool *ProcessorPool
auth string
workerInfo func() workerInfo
cancel func()
lastPoolSize int
}

// Setup installs the HTTP routes that will handle requests to the HTTP API.
Expand All @@ -37,6 +38,9 @@ func (api *RemoteController) Setup() {
r.HandleFunc("/pool/increment", api.IncrementPool).Methods("POST")
r.HandleFunc("/pool/decrement", api.DecrementPool).Methods("POST")

r.HandleFunc("/pause", api.Pause).Methods("POST")
r.HandleFunc("/resume", api.Resume).Methods("POST")

r.Use(api.SetContext)
r.Use(api.CheckAuth)
http.Handle("/", r)
Expand Down Expand Up @@ -179,6 +183,33 @@ func (api *RemoteController) ShutdownWorker(w http.ResponseWriter, req *http.Req
w.WriteHeader(http.StatusNoContent)
}

// IncrementPool tells the worker to spin up another processor.
func (api *RemoteController) Pause(w http.ResponseWriter, req *http.Request) {
log := context.LoggerFromContext(req.Context()).WithField("method", "Pause")

api.lastPoolSize = api.pool.Size()
api.pool.SetSize(0)
log.Info("pool size set to 0")

w.WriteHeader(http.StatusNoContent)
}

// IncrementPool tells the worker to spin up another processor.
func (api *RemoteController) Resume(w http.ResponseWriter, req *http.Request) {
log := context.LoggerFromContext(req.Context()).WithField("method", "Resume")
if api.lastPoolSize == 0 {
if api.pool.Size() > 0 {
api.lastPoolSize = api.pool.Size()
} else {
api.lastPoolSize = 1
}
}
api.pool.SetSize(api.lastPoolSize)
log.Info("pool size set to " + fmt.Sprintf("%d", api.lastPoolSize))

w.WriteHeader(http.StatusNoContent)
}

// IncrementPool tells the worker to spin up another processor.
func (api *RemoteController) IncrementPool(w http.ResponseWriter, req *http.Request) {
log := context.LoggerFromContext(req.Context()).WithField("method", "IncrementPool")
Expand Down
3 changes: 3 additions & 0 deletions step_start_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func (s *stepStartInstance) Run(state multistep.StateBag) multistep.StepAction {
}).Error("couldn't start instance, attempting requeue")
context.CaptureError(ctx, err)

requeueCount, _ := state.Get("requeueCount").(int)
state.Put("requeueCount", requeueCount+1)

err := buildJob.Requeue(preTimeoutCtx)
if err != nil {
logger.WithField("err", err).Error("couldn't requeue job")
Expand Down
Loading