forked from shmel1k/gop
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
87 lines (73 loc) · 1.37 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package gop
import (
"time"
)
type worker struct {
quit <-chan struct{}
tasks <-chan TaskFn
conf *workerConfig
}
type workerConfig struct {
ttl time.Duration
onTaskTaken func()
onTaskFinished func()
onExtraWorkerSpawned func()
onExtraWorkerFinished func()
}
func newWorker(tasks <-chan TaskFn, quit <-chan struct{}, params *workerConfig) *worker {
return &worker{
quit: quit,
tasks: tasks,
conf: params,
}
}
func (w *worker) run() {
for {
select {
case t := <-w.tasks:
w.runTask(t)
case <-w.quit:
return
}
}
}
func (w *worker) runTask(t TaskFn) {
if t != nil {
w.conf.onTaskTaken()
t()
w.conf.onTaskFinished()
}
}
type additionalWorker worker
func newAdditionalWorker(tasks <-chan TaskFn, quit <-chan struct{}, params *workerConfig) *additionalWorker {
return &additionalWorker{
quit: quit,
tasks: tasks,
conf: params,
}
}
func (w *additionalWorker) run(t TaskFn) {
// Execute the task anyway.
// But next we can easily stop the worker.
ticker := time.NewTicker(w.conf.ttl)
defer ticker.Stop()
defer w.conf.onExtraWorkerFinished()
w.runTask(t)
for {
select {
case <-ticker.C:
return
case <-w.quit:
return
case t := <-w.tasks:
w.runTask(t)
}
}
}
func (w *additionalWorker) runTask(t TaskFn) {
if t != nil {
w.conf.onTaskTaken()
t()
w.conf.onTaskFinished()
}
}