Skip to content

Commit 67d6243

Browse files
committed
internal/relui: email on schedule failure
The user who scheduled a workflow may not be closely watching at the time of failure. This adds functionality to send an email if a scheduled workflow cannot progress. The current implementation logs on stall, but it's trivial to configure to email as some tasks do. For golang/go#54476 Change-Id: Id0deefd3c1b07f569585600a583ba4e04f8f7be1 Reviewed-on: https://go-review.googlesource.com/c/build/+/444695 Run-TryBot: Jenny Rakoczy <[email protected]> TryBot-Result: Gopher Robot <[email protected]> Reviewed-by: Heschi Kreinick <[email protected]>
1 parent 3dfc64c commit 67d6243

File tree

16 files changed

+276
-81
lines changed

16 files changed

+276
-81
lines changed

cmd/relui/main.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ func main() {
7373
addressVarFlag(&annMail.From, "announce-mail-from", "The From address to use for the (pre-)announcement mail.")
7474
addressVarFlag(&annMail.To, "announce-mail-to", "The To address to use for the (pre-)announcement mail.")
7575
addressListVarFlag(&annMail.BCC, "announce-mail-bcc", "The BCC address list to use for the (pre-)announcement mail.")
76+
var schedMail task.MailHeader
77+
addressVarFlag(&schedMail.From, "schedule-mail-from", "The From address to use for the scheduled workflow failure mail.")
78+
addressVarFlag(&schedMail.To, "schedule-mail-to", "The To address to use for the the scheduled workflow failure mail.")
79+
addressListVarFlag(&schedMail.BCC, "schedule-mail-bcc", "The BCC address list to use for the scheduled workflow failure mail.")
7680
var twitterAPI secret.TwitterCredentials
7781
secret.JSONVarFlag(&twitterAPI, "twitter-api-secret", "Twitter API secret to use for workflows involving tweeting.")
7882
masterKey := secret.Flag("builder-master-key", "Builder master key")
@@ -212,18 +216,24 @@ func main() {
212216
}
213217
dh.RegisterDefinition("Tag x/ repos", tagTasks.NewDefinition())
214218

215-
w := relui.NewWorker(dh, dbPool, relui.NewPGListener(dbPool))
216-
go w.Run(ctx)
217-
if err := w.ResumeAll(ctx); err != nil {
218-
log.Printf("w.ResumeAll() = %v", err)
219-
}
220219
var base *url.URL
221220
if *baseURL != "" {
222221
base, err = url.Parse(*baseURL)
223222
if err != nil {
224223
log.Fatalf("url.Parse(%q) = %v, %v", *baseURL, base, err)
225224
}
226225
}
226+
l := &relui.PGListener{
227+
DB: dbPool,
228+
BaseURL: base,
229+
ScheduleFailureMailHeader: schedMail,
230+
SendMail: relui.LogOnlyMailer,
231+
}
232+
w := relui.NewWorker(dh, dbPool, l)
233+
go w.Run(ctx)
234+
if err := w.ResumeAll(ctx); err != nil {
235+
log.Printf("w.ResumeAll() = %v", err)
236+
}
227237
s := relui.NewServer(dbPool, w, base, siteHeader, ms)
228238
log.Fatalln(https.ListenAndServe(ctx, &ochttp.Handler{Handler: GRPCHandler(grpcServer, s)}))
229239
}

internal/relui/buildrelease_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,11 @@ type verboseListener struct {
544544
outputListener func(string, interface{})
545545
}
546546

547+
func (l *verboseListener) WorkflowStalled(workflowID uuid.UUID) error {
548+
l.t.Logf("workflow %q: stalled", workflowID.String())
549+
return nil
550+
}
551+
547552
func (l *verboseListener) TaskStateChanged(_ uuid.UUID, _ string, st *workflow.TaskState) error {
548553
switch {
549554
case !st.Finished:

internal/relui/content.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package relui
66

7-
import "embed"
7+
import (
8+
"embed"
9+
)
810

911
// static is our static web server content.
1012
//

internal/relui/listener.go

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,51 @@
55
package relui
66

77
import (
8+
"bytes"
89
"context"
910
"database/sql"
1011
"encoding/json"
1112
"fmt"
13+
"html/template"
1214
"log"
15+
"net/url"
1316
"time"
1417

1518
"github.com/google/uuid"
1619
"github.com/jackc/pgx/v4"
1720
"golang.org/x/build/internal/relui/db"
21+
"golang.org/x/build/internal/task"
1822
"golang.org/x/build/internal/workflow"
1923
)
2024

21-
func NewPGListener(db db.PGDBTX) *PGListener {
22-
return &PGListener{db}
23-
}
24-
2525
// PGListener implements workflow.Listener for recording workflow state.
2626
type PGListener struct {
27-
db db.PGDBTX
27+
DB db.PGDBTX
28+
29+
BaseURL *url.URL
30+
31+
ScheduleFailureMailHeader task.MailHeader
32+
SendMail func(task.MailHeader, task.MailContent) error
33+
34+
templ *template.Template
35+
}
36+
37+
// WorkflowStalled is called when no tasks are runnable.
38+
func (l *PGListener) WorkflowStalled(workflowID uuid.UUID) error {
39+
wf, err := db.New(l.DB).Workflow(context.Background(), workflowID)
40+
if err != nil || wf.ScheduleID.Int32 == 0 {
41+
return err
42+
}
43+
var buf bytes.Buffer
44+
body := scheduledFailureEmailBody{Workflow: wf}
45+
if err := l.template("scheduled_workflow_failure_email.txt").Execute(&buf, body); err != nil {
46+
log.Printf("WorkflowFinished: Execute(_, %v) = %q", body, err)
47+
return err
48+
}
49+
return l.SendMail(l.ScheduleFailureMailHeader, task.MailContent{
50+
Subject: fmt.Sprintf("[relui] Scheduled workflow %q failed", wf.Name.String),
51+
BodyText: buf.String(),
52+
})
2853
}
2954

3055
// TaskStateChanged is called whenever a task is updated by the
@@ -38,7 +63,7 @@ func (l *PGListener) TaskStateChanged(workflowID uuid.UUID, taskName string, sta
3863
if err != nil {
3964
return err
4065
}
41-
err = l.db.BeginFunc(ctx, func(tx pgx.Tx) error {
66+
err = l.DB.BeginFunc(ctx, func(tx pgx.Tx) error {
4267
q := db.New(tx)
4368
updated := time.Now()
4469
_, err := q.UpsertTask(ctx, db.UpsertTaskParams{
@@ -62,7 +87,7 @@ func (l *PGListener) TaskStateChanged(workflowID uuid.UUID, taskName string, sta
6287

6388
// WorkflowStarted persists a new workflow execution in the database.
6489
func (l *PGListener) WorkflowStarted(ctx context.Context, workflowID uuid.UUID, name string, params map[string]interface{}, scheduleID int) error {
65-
q := db.New(l.db)
90+
q := db.New(l.DB)
6691
m, err := json.Marshal(params)
6792
if err != nil {
6893
return err
@@ -80,11 +105,16 @@ func (l *PGListener) WorkflowStarted(ctx context.Context, workflowID uuid.UUID,
80105
return err
81106
}
82107

108+
type scheduledFailureEmailBody struct {
109+
Workflow db.Workflow
110+
Err error
111+
}
112+
83113
// WorkflowFinished saves the final state of a workflow after its run
84114
// has completed.
85115
func (l *PGListener) WorkflowFinished(ctx context.Context, workflowID uuid.UUID, outputs map[string]interface{}, workflowErr error) error {
86-
log.Printf("WorkflowCompleted(%q, %v, %q)", workflowID, outputs, workflowErr)
87-
q := db.New(l.db)
116+
log.Printf("WorkflowFinished(%q, %v, %q)", workflowID, outputs, workflowErr)
117+
q := db.New(l.DB)
88118
m, err := json.Marshal(outputs)
89119
if err != nil {
90120
return err
@@ -102,9 +132,21 @@ func (l *PGListener) WorkflowFinished(ctx context.Context, workflowID uuid.UUID,
102132
return err
103133
}
104134

135+
func (l *PGListener) template(name string) *template.Template {
136+
if l.templ == nil {
137+
helpers := map[string]any{"baseLink": l.baseLink}
138+
l.templ = template.Must(template.New("").Funcs(helpers).ParseFS(templates, "templates/*.txt"))
139+
}
140+
return l.templ.Lookup(name)
141+
}
142+
143+
func (l *PGListener) baseLink(target string, extras ...string) string {
144+
return BaseLink(l.BaseURL)(target, extras...)
145+
}
146+
105147
func (l *PGListener) Logger(workflowID uuid.UUID, taskName string) workflow.Logger {
106148
return &postgresLogger{
107-
db: l.db,
149+
db: l.DB,
108150
workflowID: workflowID,
109151
taskName: taskName,
110152
}
@@ -136,3 +178,8 @@ func (l *postgresLogger) Printf(format string, v ...interface{}) {
136178
log.Printf("l.Printf(%q, %v) = %v", format, v, err)
137179
}
138180
}
181+
182+
func LogOnlyMailer(header task.MailHeader, content task.MailContent) error {
183+
log.Println("Logging but not sending mail:", header, content)
184+
return nil
185+
}

internal/relui/listener_test.go

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@ package relui
77
import (
88
"context"
99
"database/sql"
10+
"errors"
11+
"fmt"
12+
"net/mail"
1013
"testing"
1114
"time"
1215

1316
"github.com/google/go-cmp/cmp"
1417
"github.com/google/go-cmp/cmp/cmpopts"
1518
"github.com/google/uuid"
1619
"golang.org/x/build/internal/relui/db"
20+
"golang.org/x/build/internal/task"
1721
"golang.org/x/build/internal/workflow"
1822
)
1923

@@ -76,7 +80,7 @@ func TestListenerTaskStateChanged(t *testing.T) {
7680
t.Fatalf("q.CreateWorkflow(%v, %v) = %v, wanted no error", ctx, wfp, err)
7781
}
7882

79-
l := &PGListener{db: dbp}
83+
l := &PGListener{DB: dbp}
8084
err = l.TaskStateChanged(wf.ID, "TestTask", c.state)
8185
if err != nil {
8286
t.Fatalf("l.TaskStateChanged(%v, %q, %v) = %v, wanted no error", wf.ID, "TestTask", c.state, err)
@@ -110,7 +114,7 @@ func TestListenerLogger(t *testing.T) {
110114
t.Fatalf("q.UpsertTask(%v, %v) = %v, wanted no error", ctx, params, err)
111115
}
112116

113-
l := &PGListener{db: dbp}
117+
l := &PGListener{DB: dbp}
114118
l.Logger(wf.ID, "TestTask").Printf("A fancy log line says %q", "hello")
115119

116120
logs, err := q.TaskLogs(ctx)
@@ -128,3 +132,99 @@ func TestListenerLogger(t *testing.T) {
128132
t.Errorf("q.TaskLogs(_, %q) mismatch (-want +got):\n%s", wf.ID, diff)
129133
}
130134
}
135+
136+
func TestPGListenerWorkflowStalledNotification(t *testing.T) {
137+
cases := []struct {
138+
desc string
139+
schedule bool
140+
taskErr bool
141+
}{
142+
{
143+
desc: "scheduled workflow failure sends notification",
144+
schedule: true,
145+
taskErr: true,
146+
},
147+
{
148+
desc: "scheduled workflow success sends nothing",
149+
schedule: true,
150+
},
151+
{
152+
desc: "unscheduled workflow success sends nothing",
153+
},
154+
{
155+
desc: "unscheduled workflow failure sends nothing",
156+
taskErr: true,
157+
},
158+
}
159+
for _, c := range cases {
160+
t.Run(c.desc, func(t *testing.T) {
161+
ctx, cancel := context.WithCancel(context.Background())
162+
defer cancel()
163+
p := testDB(ctx, t)
164+
var schedID int
165+
if c.schedule {
166+
sched, err := db.New(p).CreateSchedule(ctx, db.CreateScheduleParams{WorkflowName: c.desc})
167+
if err != nil {
168+
t.Fatalf("CreateSchedule() = %v, wanted no error", err)
169+
}
170+
schedID = int(sched.ID)
171+
}
172+
wd := workflow.New()
173+
complete := workflow.Task0(wd, "complete", func(ctx context.Context) (string, error) {
174+
if c.taskErr {
175+
return "", fmt.Errorf("c.taskErr: %t", c.taskErr)
176+
}
177+
return "done", nil
178+
})
179+
workflow.Output(wd, "finished", complete)
180+
dh := NewDefinitionHolder()
181+
dh.RegisterDefinition(c.desc, wd)
182+
183+
header := task.MailHeader{
184+
From: mail.Address{Address: "[email protected]"},
185+
To: mail.Address{Address: "[email protected]"},
186+
}
187+
var gotHeader task.MailHeader
188+
var gotContent task.MailContent
189+
pgl := &PGListener{
190+
DB: p,
191+
SendMail: func(h task.MailHeader, c task.MailContent) error {
192+
gotHeader, gotContent = h, c
193+
return nil
194+
},
195+
ScheduleFailureMailHeader: header,
196+
}
197+
listener := &testWorkflowListener{
198+
Listener: pgl,
199+
onFinished: cancel,
200+
}
201+
w := NewWorker(dh, p, listener)
202+
203+
id, err := w.StartWorkflow(ctx, c.desc, nil, schedID)
204+
if err != nil {
205+
t.Fatalf("w.StartWorkflow(_, %q, %v, %d) = %v, %v, wanted no error", c.desc, nil, schedID, id, err)
206+
}
207+
listener.onStalled = func() {
208+
w.cancelWorkflow(id)
209+
}
210+
if err := w.Run(ctx); !errors.Is(err, context.Canceled) {
211+
t.Errorf("w.Run() = %v, wanted %v", err, context.Canceled)
212+
}
213+
214+
wantSend := c.taskErr && c.schedule
215+
if (gotContent.Subject == "") == wantSend {
216+
t.Errorf("gotContent.Subject = %q, wanted empty: %t", gotContent.Subject, !c.taskErr)
217+
}
218+
if (gotContent.BodyText == "") == wantSend {
219+
t.Errorf("gotContent.BodyText = %q, wanted empty: %t", gotContent.BodyText, !c.taskErr)
220+
}
221+
var wantHeader task.MailHeader
222+
if wantSend {
223+
wantHeader = header
224+
}
225+
if diff := cmp.Diff(wantHeader, gotHeader); diff != "" {
226+
t.Errorf("WorkflowFinished(_, %q) mismatch (-want +got):\n%s", id, diff)
227+
}
228+
})
229+
}
230+
}

internal/relui/schedule_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func TestSchedulerCreate(t *testing.T) {
120120
ctx, cancel := context.WithCancel(context.Background())
121121
defer cancel()
122122
p := testDB(ctx, t)
123-
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}))
123+
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}))
124124
row, err := s.Create(ctx, c.sched, c.workflowName, c.params)
125125
if (err != nil) != c.wantErr {
126126
t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wantErr: %t", c.sched, c.workflowName, c.params, row, err, c.wantErr)
@@ -243,7 +243,7 @@ func TestSchedulerResume(t *testing.T) {
243243
defer cancel()
244244
p := testDB(ctx, t)
245245
q := db.New(p)
246-
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}))
246+
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}))
247247

248248
for _, csp := range c.scheds {
249249
if _, err := q.CreateSchedule(ctx, csp); err != nil {
@@ -294,7 +294,7 @@ func TestScheduleDelete(t *testing.T) {
294294
defer cancel()
295295
p := testDB(ctx, t)
296296
q := db.New(p)
297-
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}))
297+
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}))
298298
row, err := s.Create(ctx, c.sched, c.workflowName, c.params)
299299
if err != nil {
300300
t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wanted no error", c.sched, c.workflowName, c.params, row, err)
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{{- /*gotype: golang.org/x/build/internal/relui.scheduledFailureEmailBody*/ -}}
2+
The scheduled workflow "{{.Workflow.Name.String}}" ({{.Workflow.ID.String}}) failed at {{.Workflow.UpdatedAt.UTC.Format "2006/01/02 15:04 MST"}}.
3+
4+
See the following page for details:
5+
6+
{{baseLink "/workflows/" .Workflow.ID.String}}

0 commit comments

Comments
 (0)