forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob_executor.go
409 lines (348 loc) · 13.1 KB
/
job_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
package river
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"runtime/debug"
"time"
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivertype"
)
// Error used in CancelFunc in cases where the job was not cancelled for
// purposes of resource cleanup. Should never be user visible.
var errExecutorDefaultCancel = errors.New("context cancelled as executor finished")
// UnknownJobKindError is returned when a Client fetches and attempts to
// work a job that has not been registered on the Client's Workers bundle (using
// AddWorker).
type UnknownJobKindError struct {
// Kind is the string that was returned by the JobArgs Kind method.
Kind string
}
// Error returns the error string.
func (e *UnknownJobKindError) Error() string {
return "job kind is not registered in the client's Workers bundle: " + e.Kind
}
// Is implements the interface used by errors.Is to determine if errors are
// equivalent. It returns true for any other UnknownJobKindError without
// regard to the Kind string so it is possible to detect this type of error
// with:
//
// errors.Is(err, &UnknownJobKindError{})
func (e *UnknownJobKindError) Is(target error) bool {
_, ok := target.(*UnknownJobKindError)
return ok
}
// JobCancel wraps err and can be returned from a Worker's Work method to cancel
// the job at the end of execution. Regardless of whether or not the job has any
// remaining attempts, this will ensure the job does not execute again.
func JobCancel(err error) error {
return &JobCancelError{err: err}
}
// JobCancelError is the error type returned by JobCancel. It should not be
// initialized directly, but is returned from the [JobCancel] function and can
// be used for test assertions.
type JobCancelError struct {
err error
}
func (e *JobCancelError) Error() string {
if e.err == nil {
return "JobCancelError: <nil>"
}
// should not ever be called, but add a prefix just in case:
return "JobCancelError: " + e.err.Error()
}
func (e *JobCancelError) Is(target error) bool {
_, ok := target.(*JobCancelError)
return ok
}
func (e *JobCancelError) Unwrap() error { return e.err }
// JobSnooze can be returned from a Worker's Work method to cause the job to be
// tried again after the specified duration. This also has the effect of
// incrementing the job's MaxAttempts by 1, meaning that jobs can be repeatedly
// snoozed without ever being discarded.
//
// Panics if duration is < 0.
func JobSnooze(duration time.Duration) error {
if duration < 0 {
panic("JobSnooze: duration must be >= 0")
}
return &JobSnoozeError{duration: duration}
}
// JobSnoozeError is the error type returned by JobSnooze. It should not be
// initialized directly, but is returned from the [JobSnooze] function and can
// be used for test assertions.
type JobSnoozeError struct {
duration time.Duration
}
func (e *JobSnoozeError) Error() string {
// should not ever be called, but add a prefix just in case:
return fmt.Sprintf("JobSnoozeError: %s", e.duration)
}
func (e *JobSnoozeError) Is(target error) bool {
_, ok := target.(*JobSnoozeError)
return ok
}
var ErrJobCancelledRemotely = JobCancel(errors.New("job cancelled remotely"))
type jobExecutorResult struct {
Err error
NextRetry time.Time
PanicTrace string
PanicVal any
}
// ErrorStr returns an appropriate string to persist to the database based on
// the type of internal failure (i.e. error or panic). Panics if called on a
// non-errored result.
func (r *jobExecutorResult) ErrorStr() string {
switch {
case r.Err != nil:
return r.Err.Error()
case r.PanicVal != nil:
return fmt.Sprintf("%v", r.PanicVal)
}
panic("ErrorStr should not be called on non-errored result")
}
type jobExecutor struct {
baseservice.BaseService
CancelFunc context.CancelCauseFunc
ClientJobTimeout time.Duration
Completer jobcompleter.JobCompleter
ClientRetryPolicy ClientRetryPolicy
ErrorHandler ErrorHandler
InformProducerDoneFunc func(jobRow *rivertype.JobRow)
JobRow *rivertype.JobRow
GlobalMiddleware []rivertype.WorkerMiddleware
SchedulerInterval time.Duration
WorkUnit workunit.WorkUnit
// Meant to be used from within the job executor only.
start time.Time
stats *jobstats.JobStatistics // initialized by the executor, and handed off to completer
}
func (e *jobExecutor) Cancel() {
e.Logger.Warn(e.Name+": job cancelled remotely", slog.Int64("job_id", e.JobRow.ID))
e.CancelFunc(ErrJobCancelledRemotely)
}
func (e *jobExecutor) Execute(ctx context.Context) {
// Ensure that the context is cancelled no matter what, or it will leak:
defer e.CancelFunc(errExecutorDefaultCancel)
e.start = e.Time.NowUTC()
e.stats = &jobstats.JobStatistics{
QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt),
}
res := e.execute(ctx)
if res.Err != nil && errors.Is(context.Cause(ctx), ErrJobCancelledRemotely) {
res.Err = context.Cause(ctx)
}
e.reportResult(ctx, res)
e.InformProducerDoneFunc(e.JobRow)
}
// Executes the job, handling a panic if necessary (and various other error
// conditions). The named return value is so that we can still return a value in
// case of a panic.
//
//nolint:nonamedreturns
func (e *jobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
defer func() {
if recovery := recover(); recovery != nil {
e.Logger.ErrorContext(ctx, e.Name+": panic recovery; possible bug with Worker",
slog.Int64("job_id", e.JobRow.ID),
slog.String("kind", e.JobRow.Kind),
slog.String("panic_val", fmt.Sprintf("%v", recovery)),
)
res = &jobExecutorResult{
PanicTrace: string(debug.Stack()),
PanicVal: recovery,
}
}
e.stats.RunDuration = e.Time.NowUTC().Sub(e.start)
}()
if e.WorkUnit == nil {
e.Logger.ErrorContext(ctx, e.Name+": Unhandled job kind",
slog.String("kind", e.JobRow.Kind),
slog.Int64("job_id", e.JobRow.ID),
)
return &jobExecutorResult{Err: &UnknownJobKindError{Kind: e.JobRow.Kind}}
}
if err := e.WorkUnit.UnmarshalJob(); err != nil {
return &jobExecutorResult{Err: err}
}
workerMiddleware := e.WorkUnit.Middleware()
doInner := func(ctx context.Context) error {
jobTimeout := e.WorkUnit.Timeout()
if jobTimeout == 0 {
jobTimeout = e.ClientJobTimeout
}
// No timeout if a -1 was specified.
if jobTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, jobTimeout)
defer cancel()
}
if err := e.WorkUnit.Work(ctx); err != nil {
return err
}
return nil
}
allMiddleware := make([]rivertype.WorkerMiddleware, 0, len(e.GlobalMiddleware)+len(workerMiddleware))
allMiddleware = append(allMiddleware, e.GlobalMiddleware...)
allMiddleware = append(allMiddleware, workerMiddleware...)
if len(allMiddleware) > 0 {
// Wrap middlewares in reverse order so the one defined first is wrapped
// as the outermost function and is first to receive the operation.
for i := len(allMiddleware) - 1; i >= 0; i-- {
middlewareItem := allMiddleware[i] // capture the current middleware item
previousDoInner := doInner // Capture the current doInner function
doInner = func(ctx context.Context) error {
return middlewareItem.Work(ctx, e.JobRow, previousDoInner)
}
}
}
return &jobExecutorResult{Err: doInner(ctx)}
}
func (e *jobExecutor) invokeErrorHandler(ctx context.Context, res *jobExecutorResult) bool {
invokeAndHandlePanic := func(funcName string, errorHandler func() *ErrorHandlerResult) *ErrorHandlerResult {
defer func() {
if panicVal := recover(); panicVal != nil {
e.Logger.ErrorContext(ctx, e.Name+": ErrorHandler invocation panicked",
slog.String("function_name", funcName),
slog.String("panic_val", fmt.Sprintf("%v", panicVal)),
)
}
}()
return errorHandler()
}
var errorHandlerRes *ErrorHandlerResult
switch {
case res.Err != nil:
errorHandlerRes = invokeAndHandlePanic("HandleError", func() *ErrorHandlerResult {
return e.ErrorHandler.HandleError(ctx, e.JobRow, res.Err)
})
case res.PanicVal != nil:
errorHandlerRes = invokeAndHandlePanic("HandlePanic", func() *ErrorHandlerResult {
return e.ErrorHandler.HandlePanic(ctx, e.JobRow, res.PanicVal, res.PanicTrace)
})
}
return errorHandlerRes != nil && errorHandlerRes.SetCancelled
}
func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) {
var snoozeErr *JobSnoozeError
if res.Err != nil && errors.As(res.Err, &snoozeErr) {
e.Logger.DebugContext(ctx, e.Name+": Job snoozed",
slog.Int64("job_id", e.JobRow.ID),
slog.String("job_kind", e.JobRow.Kind),
slog.Duration("duration", snoozeErr.duration),
)
nextAttemptScheduledAt := time.Now().Add(snoozeErr.duration)
// Normally, snoozed jobs are set `scheduled` for the future and it's the
// scheduler's job to set them back to `available` so they can be reworked.
// Just as with retryable jobs, this isn't friendly for short snooze times
// so we instead make the job immediately `available` if the snooze time is
// smaller than the scheduler's run interval.
var params *riverdriver.JobSetStateIfRunningParams
if nextAttemptScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval {
params = riverdriver.JobSetStateSnoozedAvailable(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1)
} else {
params = riverdriver.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1)
}
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Error snoozing job",
slog.Int64("job_id", e.JobRow.ID),
)
}
return
}
if res.Err != nil || res.PanicVal != nil {
e.reportError(ctx, res)
return
}
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(e.JobRow.ID, e.Time.NowUTC())); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Error completing job",
slog.String("err", err.Error()),
slog.Int64("job_id", e.JobRow.ID),
)
return
}
}
func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) {
var (
cancelJob bool
cancelErr *JobCancelError
)
logAttrs := []any{
slog.String("error", res.ErrorStr()),
slog.Int64("job_id", e.JobRow.ID),
slog.String("job_kind", e.JobRow.Kind),
}
switch {
case errors.As(res.Err, &cancelErr):
cancelJob = true
e.Logger.DebugContext(ctx, e.Name+": Job cancelled explicitly", logAttrs...)
case res.Err != nil:
e.Logger.ErrorContext(ctx, e.Name+": Job errored", logAttrs...)
case res.PanicVal != nil:
e.Logger.ErrorContext(ctx, e.Name+": Job panicked", logAttrs...)
}
if e.ErrorHandler != nil && !cancelJob {
// Error handlers also have an opportunity to cancel the job.
cancelJob = e.invokeErrorHandler(ctx, res)
}
attemptErr := rivertype.AttemptError{
At: e.start,
Attempt: e.JobRow.Attempt,
Error: res.ErrorStr(),
Trace: res.PanicTrace,
}
errData, err := json.Marshal(attemptErr)
if err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Failed to marshal attempt error", logAttrs...)
return
}
now := time.Now()
if cancelJob {
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCancelled(e.JobRow.ID, now, errData)); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Failed to cancel job and report error", logAttrs...)
}
return
}
if e.JobRow.Attempt >= e.JobRow.MaxAttempts {
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateDiscarded(e.JobRow.ID, now, errData)); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Failed to discard job and report error", logAttrs...)
}
return
}
var nextRetryScheduledAt time.Time
if e.WorkUnit != nil {
nextRetryScheduledAt = e.WorkUnit.NextRetry()
}
if nextRetryScheduledAt.IsZero() {
nextRetryScheduledAt = e.ClientRetryPolicy.NextRetry(e.JobRow)
}
if nextRetryScheduledAt.Before(now) {
e.Logger.WarnContext(ctx,
e.Name+": Retry policy returned invalid next retry before current time; using default retry policy instead",
slog.Time("next_retry_scheduled_at", nextRetryScheduledAt),
slog.Time("now", now),
)
nextRetryScheduledAt = (&DefaultClientRetryPolicy{}).NextRetry(e.JobRow)
}
// Normally, errored jobs are set `retryable` for the future and it's the
// scheduler's job to set them back to `available` so they can be reworked.
// This isn't friendly for smaller retry times though because it means that
// effectively no retry time smaller than the scheduler's run interval is
// respected. Here, we offset that with a branch that makes jobs immediately
// `available` if their retry was smaller than the scheduler's run interval.
var params *riverdriver.JobSetStateIfRunningParams
if nextRetryScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval {
params = riverdriver.JobSetStateErrorAvailable(e.JobRow.ID, nextRetryScheduledAt, errData)
} else {
params = riverdriver.JobSetStateErrorRetryable(e.JobRow.ID, nextRetryScheduledAt, errData)
}
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Failed to report error for job", logAttrs...)
}
}