forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwork_unit_wrapper.go
42 lines (33 loc) · 1.19 KB
/
work_unit_wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package river
import (
"context"
"encoding/json"
"time"
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/rivertype"
)
// workUnitFactoryWrapper wraps a Worker to implement workUnitFactory.
type workUnitFactoryWrapper[T JobArgs] struct {
worker Worker[T]
}
func (w *workUnitFactoryWrapper[T]) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit {
return &wrapperWorkUnit[T]{jobRow: jobRow, worker: w.worker}
}
// wrapperWorkUnit implements workUnit for a job and Worker.
type wrapperWorkUnit[T JobArgs] struct {
job *Job[T] // not set until after UnmarshalJob is invoked
jobRow *rivertype.JobRow
worker Worker[T]
}
func (w *wrapperWorkUnit[T]) NextRetry() time.Time { return w.worker.NextRetry(w.job) }
func (w *wrapperWorkUnit[T]) Timeout() time.Duration { return w.worker.Timeout(w.job) }
func (w *wrapperWorkUnit[T]) Work(ctx context.Context) error { return w.worker.Work(ctx, w.job) }
func (w *wrapperWorkUnit[T]) Middleware() []rivertype.WorkerMiddleware {
return w.worker.Middleware(w.job)
}
func (w *wrapperWorkUnit[T]) UnmarshalJob() error {
w.job = &Job[T]{
JobRow: w.jobRow,
}
return json.Unmarshal(w.jobRow.EncodedArgs, &w.job.Args)
}