-
Notifications
You must be signed in to change notification settings - Fork 1
/
nsq.go
172 lines (143 loc) · 3.22 KB
/
nsq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package nsq
import (
"context"
"encoding/json"
"sync"
"sync/atomic" //nolint:typecheck,nolintlint
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
nsq "github.com/nsqio/go-nsq"
)
var _ core.Worker = (*Worker)(nil)
// Worker for NSQ
type Worker struct {
q *nsq.Consumer
p *nsq.Producer
cfg *nsq.Config
stopOnce sync.Once
startOnce sync.Once
stop chan struct{}
stopFlag int32
opts Options
tasks chan *nsq.Message
}
// NewWorker for struc
func NewWorker(opts ...Option) *Worker {
w := &Worker{
opts: newOptions(opts...),
stop: make(chan struct{}),
tasks: make(chan *nsq.Message),
}
w.cfg = nsq.NewConfig()
w.cfg.MaxInFlight = w.opts.maxInFlight
if err := w.startProducer(); err != nil {
panic(err)
}
return w
}
func (w *Worker) startProducer() error {
var err error
w.p, err = nsq.NewProducer(w.opts.addr, w.cfg)
return err
}
func (w *Worker) startConsumer() (err error) {
w.startOnce.Do(func() {
w.q, err = nsq.NewConsumer(w.opts.topic, w.opts.channel, w.cfg)
if err != nil {
return
}
w.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
if len(msg.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
// In this case, a message with an empty body is simply ignored/discarded.
return nil
}
loop:
for {
select {
case w.tasks <- msg:
break loop
case <-w.stop:
if msg != nil {
// re-queue the job if worker has been shutdown.
msg.Requeue(-1)
}
break loop
case <-time.After(2 * time.Second):
msg.Touch()
}
}
return nil
}))
err = w.q.ConnectToNSQD(w.opts.addr)
if err != nil {
return
}
})
return err
}
// Run start the worker
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
return w.opts.runFunc(ctx, task)
}
// Shutdown worker
func (w *Worker) Shutdown() error {
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
return queue.ErrQueueShutdown
}
w.stopOnce.Do(func() {
// notify shtdown event to worker and consumer
close(w.stop)
// stop producer and consumer
if w.q != nil {
w.q.ChangeMaxInFlight(0)
w.q.Stop()
<-w.q.StopChan
}
w.p.Stop()
// close task channel
close(w.tasks)
})
return nil
}
// Queue send notification to queue
func (w *Worker) Queue(job core.QueuedMessage) error {
if atomic.LoadInt32(&w.stopFlag) == 1 {
return queue.ErrQueueShutdown
}
return w.p.Publish(w.opts.topic, job.Bytes())
}
// Request fetch new task from queue
func (w *Worker) Request() (core.QueuedMessage, error) {
if err := w.startConsumer(); err != nil {
return nil, err
}
clock := 0
loop:
for {
select {
case task, ok := <-w.tasks:
if !ok {
return nil, queue.ErrQueueHasBeenClosed
}
var data job.Message
_ = json.Unmarshal(task.Body, &data)
return &data, nil
case <-time.After(1 * time.Second):
if clock == 5 {
break loop
}
clock += 1
}
}
return nil, queue.ErrNoTaskInQueue
}
// Stats retrieves the current connection and message statistics for a Consumer
func (w *Worker) Stats() *nsq.ConsumerStats {
if w.q == nil {
return nil
}
return w.q.Stats()
}