-
Notifications
You must be signed in to change notification settings - Fork 86
/
file_job_queue.go
163 lines (135 loc) · 4.3 KB
/
file_job_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package worker
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"time"
gocontext "context"
"github.com/bitly/go-simplejson"
"github.com/sirupsen/logrus"
"github.com/travis-ci/worker/backend"
"github.com/travis-ci/worker/context"
)
// FileJobQueue is a JobQueue that uses directories for input, state, and output
type FileJobQueue struct {
queue string
pollingInterval time.Duration
buildJobChan chan Job
baseDir string
createdDir string
receivedDir string
startedDir string
finishedDir string
logDir string
DefaultLanguage, DefaultDist, DefaultArch, DefaultGroup, DefaultOS string
}
// NewFileJobQueue creates a *FileJobQueue from a base directory and queue name
func NewFileJobQueue(baseDir, queue string, pollingInterval time.Duration) (*FileJobQueue, error) {
_, err := os.Stat(baseDir)
if err != nil {
return nil, err
}
fd, err := os.Create(filepath.Join(baseDir, ".write-test"))
if err != nil {
return nil, err
}
defer fd.Close()
createdDir := filepath.Join(baseDir, queue, "10-created.d")
receivedDir := filepath.Join(baseDir, queue, "30-received.d")
startedDir := filepath.Join(baseDir, queue, "50-started.d")
finishedDir := filepath.Join(baseDir, queue, "70-finished.d")
logDir := filepath.Join(baseDir, queue, "log")
for _, dirname := range []string{createdDir, receivedDir, startedDir, finishedDir, logDir} {
err := os.MkdirAll(dirname, os.FileMode(0755))
if err != nil {
return nil, err
}
}
return &FileJobQueue{
queue: queue,
pollingInterval: pollingInterval,
baseDir: baseDir,
createdDir: createdDir,
receivedDir: receivedDir,
startedDir: startedDir,
finishedDir: finishedDir,
logDir: logDir,
}, nil
}
// Jobs returns a channel of jobs from the created directory
func (f *FileJobQueue) Jobs(ctx gocontext.Context) (<-chan Job, error) {
if f.buildJobChan == nil {
f.buildJobChan = make(chan Job)
go f.pollInDirForJobs(ctx)
}
return f.buildJobChan, nil
}
func (f *FileJobQueue) pollInDirForJobs(ctx gocontext.Context) {
for {
f.pollInDirTick(ctx)
time.Sleep(f.pollingInterval)
}
}
func (f *FileJobQueue) pollInDirTick(ctx gocontext.Context) {
logger := context.LoggerFromContext(ctx).WithField("self", "file_job_queue")
entries, err := os.ReadDir(f.createdDir)
if err != nil {
logger.WithField("err", err).Error("input directory read error")
return
}
logger.WithFields(logrus.Fields{
"entries": entries,
"file_job_queue": fmt.Sprintf("%p", f),
}).Debug("entries")
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") {
continue
}
buildJob := &fileJob{
createdFile: filepath.Join(f.createdDir, entry.Name()),
payload: &JobPayload{},
startAttributes: &backend.StartAttributes{},
}
startAttrs := &jobPayloadStartAttrs{Config: &backend.StartAttributes{}}
fb, err := os.ReadFile(buildJob.createdFile)
if err != nil {
logger.WithField("err", err).Error("input file read error")
continue
}
err = json.Unmarshal(fb, buildJob.payload)
if err != nil {
logger.WithField("err", err).Error("payload JSON parse error, skipping")
continue
}
err = json.Unmarshal(fb, &startAttrs)
if err != nil {
logger.WithField("err", err).Error("start attributes JSON parse error, skipping")
continue
}
buildJob.rawPayload, err = simplejson.NewJson(fb)
if err != nil {
logger.WithField("err", err).Error("raw payload JSON parse error, skipping")
continue
}
buildJob.startAttributes = startAttrs.Config
buildJob.startAttributes.VMConfig = buildJob.payload.VMConfig
buildJob.startAttributes.VMType = buildJob.payload.VMType
buildJob.startAttributes.SetDefaults(f.DefaultLanguage, f.DefaultDist, f.DefaultArch, f.DefaultGroup, f.DefaultOS, VMTypeDefault, VMConfigDefault)
buildJob.receivedFile = filepath.Join(f.receivedDir, entry.Name())
buildJob.startedFile = filepath.Join(f.startedDir, entry.Name())
buildJob.finishedFile = filepath.Join(f.finishedDir, entry.Name())
buildJob.logFile = filepath.Join(f.logDir, strings.Replace(entry.Name(), ".json", ".log", -1))
buildJob.bytes = fb
f.buildJobChan <- buildJob
}
}
// Name returns the name of this queue type, wow!
func (q *FileJobQueue) Name() string {
return "file"
}
// Cleanup is a no-op
func (f *FileJobQueue) Cleanup() error {
return nil
}