-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.go
99 lines (85 loc) · 1.54 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package splanner
import "log"
var jobQueue chan Unit
type Unit interface {
Job() error
}
func InitQueue(maxQueue int) {
jobQueue = make(chan Unit, maxQueue)
}
type dispatcher struct {
pool chan chan Unit
workers int
}
type worker struct {
pool chan chan Unit
jobCh chan Unit
}
func newWorker(pool chan chan Unit) *worker {
return &worker{
jobCh: make(chan Unit),
pool: pool,
}
}
// start, ok worker is working now.
func (w *worker) start() {
go func() {
for {
// register the actual worker in the queue.
w.pool <- w.jobCh
select {
case job := <-w.jobCh:
// do the actual job here
err := job.Job()
if err != nil {
log.Println(err.Error())
}
}
}
}()
}
// NewDispatcher create a pointer to a dispatcher struct
func NewDispatcher(maxWorkers int) *dispatcher {
return &dispatcher{
pool: make(chan chan Unit, maxWorkers),
workers: maxWorkers,
}
}
// Run is the starting point. This should be called by the client.
func (d *dispatcher) Run(async bool) {
for i := 0; i < d.workers; i++ {
w := newWorker(d.pool)
w.start()
}
if async {
go d.dispatchAsync()
} else {
go d.dispatch()
}
}
// dispatchAsync
func (d *dispatcher) dispatchAsync() {
for job := range jobQueue {
go func(j Unit) {
jobChannel := <-d.pool
jobChannel <- j
}(job)
}
}
// dispatch not async
func (d *dispatcher) dispatch() {
go func() {
for {
select {
case job, ok := <-jobQueue:
if ok {
jobChannel := <-d.pool
jobChannel <- job
}
}
}
}()
}
func AddUnit(u Unit) {
jobQueue <- u
}