Skip to content

Commit 40c64cf

Browse files
committed
Run pipeline
1 parent ef77178 commit 40c64cf

File tree

3 files changed

+142
-7
lines changed

3 files changed

+142
-7
lines changed

project.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,15 @@ import "time"
44

55
// A Project represents a software project that has a number of jobs associated with it.
66
type Project struct {
7-
ID uint64
8-
Name string
9-
JobIDs []uint64
7+
ID uint64
8+
Name string
9+
10+
// The pipeline is a list of jobs that will be run sequentially, stopping if any
11+
// of the jobs ends with exit status other than 0.
12+
Pipeline []uint64
13+
14+
// OneOffJobs is for jobs that is not part of the Pipeline, but can be run individually.
15+
OneOffJobs []uint64
1016
}
1117

1218
// A Job is a job that can be executed.

runner.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ type task struct {
3030
// NewRunner returns a new Runner, using the given Storer.
3131
func NewRunner(s Storer) *Runner {
3232
return &Runner{
33-
Storer: s,
33+
Storer: s,
34+
runningJobs: make(map[uint64]bool),
35+
runningProjs: make(map[uint64]bool),
3436
}
3537
}
3638

@@ -49,6 +51,7 @@ func (r *Runner) ScheduleProject(id uint64) error {
4951
}
5052

5153
func (r *Runner) runJob(id uint64, inProjectPipeline bool) (Run, error) {
54+
// TODO why return Run struct?
5255
defer r.doneJob(id, inProjectPipeline)
5356
var run Run
5457

@@ -62,7 +65,7 @@ func (r *Runner) runJob(id uint64, inProjectPipeline bool) (Run, error) {
6265
return run, errors.Wrap(err, "cannot assign run ID")
6366
}
6467

65-
if err := createAndChdir(job.Workspace); err != nil {
68+
if err = createAndChdir(job.Workspace); err != nil {
6669
return run, errors.Wrap(err, "failed to create/change workspace")
6770
}
6871

@@ -99,6 +102,30 @@ func (r *Runner) runJob(id uint64, inProjectPipeline bool) (Run, error) {
99102
return run, nil
100103
}
101104

105+
func (r *Runner) runPipe(id uint64) error {
106+
proj, err := r.GetProject(id)
107+
if err != nil {
108+
return errors.Wrapf(err, "cannot get project %d", id)
109+
}
110+
defer r.donePipeline(id)
111+
112+
for _, jobID := range proj.Pipeline {
113+
r.mu.Lock()
114+
// TODO check if job is running first
115+
r.runningJobs[jobID] = true
116+
r.mu.Unlock()
117+
run, err := r.runJob(jobID, true)
118+
if err != nil {
119+
return err
120+
}
121+
122+
if !run.Success {
123+
break
124+
}
125+
}
126+
return nil
127+
}
128+
102129
func (r *Runner) doneJob(id uint64, inProjectPipeline bool) {
103130
r.mu.Lock()
104131
defer r.mu.Unlock()
@@ -119,12 +146,31 @@ func (r *Runner) doneJob(id uint64, inProjectPipeline bool) {
119146
}
120147
}()
121148
} else {
122-
//go r.runPipe(task.pipe)
149+
go r.runPipe(task.proj)
123150
}
124151
}
125152
}
126153
}
127154

155+
func (r *Runner) donePipeline(id uint64) {
156+
r.mu.Lock()
157+
defer r.mu.Unlock()
158+
159+
// Mark the Project as not running
160+
delete(r.runningProjs, id)
161+
162+
// If there are enqueued jobs/pipelines - run it
163+
if len(r.scheduled) > 0 {
164+
task := r.scheduled[0]
165+
r.scheduled = r.scheduled[1:]
166+
if task.job != 0 {
167+
go r.runJob(task.job, false)
168+
} else {
169+
go r.runPipe(task.proj)
170+
}
171+
}
172+
}
173+
128174
func createAndChdir(path string) error {
129175
if path == "" {
130176
return nil

runner_test.go

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func TestRunOutputsForJob(t *testing.T) {
108108

109109
const numRuns = 10
110110
for i := 0; i <= numRuns; i++ {
111-
r.runJob(job.ID, false)
111+
_, _ = r.runJob(job.ID, false)
112112
}
113113

114114
runs, err := r.GetNRunsForJob(job.ID, numRuns)
@@ -133,5 +133,88 @@ func TestRunOutputsForJob(t *testing.T) {
133133
if !sort.IntsAreSorted(times) {
134134
t.Fatalf("runs not run sequentially")
135135
}
136+
}
137+
138+
// Verify that a Project can be created and it's Pipeline of Jobs can run.
139+
func TestPipeline(t *testing.T) {
140+
db := newMemDB()
141+
r := NewRunner(db)
142+
143+
job1, err := db.NewJob(Job{
144+
Cmd: "echo 'hello' > goodbye.txt",
145+
})
146+
if err != nil {
147+
t.Fatal(err)
148+
}
149+
150+
job2, err := db.NewJob(Job{
151+
Cmd: "cat goodbye.txt",
152+
})
153+
if err != nil {
154+
t.Fatal(err)
155+
}
136156

157+
copyOfjob2, err := db.NewJob(Job{
158+
Cmd: "cat goodbye.txt",
159+
})
160+
if err != nil {
161+
t.Fatal(err)
162+
}
163+
164+
job3, err := db.NewJob(Job{
165+
Cmd: "rm goodbye.txt",
166+
})
167+
if err != nil {
168+
t.Fatal(err)
169+
}
170+
171+
job4, err := db.NewJob(Job{
172+
Cmd: "echo 'I should not run'",
173+
})
174+
if err != nil {
175+
t.Fatal(err)
176+
}
177+
178+
proj, err := db.NewProject(Project{
179+
Pipeline: []uint64{job1.ID, job2.ID, job3.ID, copyOfjob2.ID, job4.ID},
180+
})
181+
if err != nil {
182+
t.Fatal(err)
183+
}
184+
185+
if err := r.runPipe(proj.ID); err != nil {
186+
t.Fatal(err)
187+
}
188+
189+
want := []struct {
190+
success bool
191+
output string
192+
}{
193+
{true, ""},
194+
{true, "hello\n"},
195+
{true, ""},
196+
{false, "cat: goodbye.txt: No such file or directory\nexit status 1"},
197+
}
198+
199+
var runs []Run
200+
for _, id := range proj.Pipeline {
201+
run, err := db.GetNRunsForJob(id, 100)
202+
if err != nil {
203+
t.Fatal(err)
204+
}
205+
runs = append(runs, run...)
206+
}
207+
208+
if len(runs) != len(want) {
209+
t.Fatalf("pipeline got %d runs; want %d", len(runs), len(want))
210+
}
211+
212+
for i, run := range runs {
213+
if run.Success != want[i].success {
214+
t.Errorf("pipelined job success=%v; want %v", run.Success, want[i].success)
215+
}
216+
if run.Output != want[i].output {
217+
t.Errorf("pipelined job output=%v; want %v", run.Output, want[i].output)
218+
}
219+
}
137220
}

0 commit comments

Comments
 (0)