-
Notifications
You must be signed in to change notification settings - Fork 4
/
broker.go
170 lines (155 loc) · 4.53 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package nq
import (
"fmt"
"log"
"github.com/nats-io/nats.go"
)
type NatsBroker struct {
ns *nats.Conn
js nats.JetStreamContext
}
func (n *NatsBroker) Ping() error {
return nil
}
func (n *NatsBroker) Close() error {
n.ns.Close()
return nil
}
func (n *NatsBroker) Submit(subject string, payload TaskPayload) (*TaskMessage, error) {
return nil, nil
}
func (n *NatsBroker) Cancel(subject string, id string) (*TaskMessage, error) {
return nil, nil
}
func (n *NatsBroker) Publish(subject string, payload []byte) (*nats.PubAck, error) {
return n.js.Publish(subject, payload)
}
// Checks is a stream exists
func (n *NatsBroker) isStreamExists(stream string) bool {
_, err := n.js.StreamInfo(stream)
return err == nil
}
func (n *NatsBroker) PublishWithMeta(msg *TaskMessage) (*TaskMessage, error) {
bytesMsg, err := EncodeTMToJSON(msg)
if err != nil {
return nil, err
}
if pubAck, err := n.Publish(msg.Queue, bytesMsg); err != nil {
return nil, err
} else {
// updating sequence info, required for cancelling a task
msg.Sequence = pubAck.Sequence
return msg, nil
}
}
func (n *NatsBroker) AddStream(conf nats.StreamConfig) error {
if _, err := n.js.AddStream(&conf); err != nil {
return err
}
return nil
}
func (n *NatsBroker) DeleteStream(name string) error {
if err := n.js.DeleteStream(name); err != nil {
return err
}
return nil
}
func natsDisconnectHandler(disconnect bool, natsConnectionClosed chan struct{}) nats.Option {
if disconnect {
return nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
// Send closing signal
// debug
// fmt.Println("activating closed connection")
natsConnectionClosed <- struct{}{}
})
} else {
return nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
// TODO: user logger
// log.Debug("disconnected from nats")
fmt.Println("disconnected from nats")
})
}
}
func natsClosedHandler(disconnect bool, natsConnectionClosed chan struct{}) nats.Option {
if disconnect {
return nats.ClosedHandler(func(nc *nats.Conn) {
// TODO: use a logger instead
println("Nats Client Connection closed!", "Reason", nc.LastError())
// Send closing signal
natsConnectionClosed <- struct{}{}
})
}
return nil
}
// Utility that creates a nats jetstream
func (n *NatsBroker) createStream(streamName, subject string, policy nats.RetentionPolicy) error {
if err := n.AddStream(nats.StreamConfig{
Name: streamName,
Subjects: []string{subject},
Retention: policy,
}); err != nil {
return err
}
return nil
}
// Temporary function that fulfill statistic demands from nq-cli
func (n *NatsBroker) Stats(q *Queue) error {
jinfo, err := n.js.StreamInfo(q.stream)
if err != nil {
return err
}
fmt.Printf("queue: %s | MessagesPending: %d | Size: %d Bytes \n", q.stream, jinfo.State.Msgs, jinfo.State.Bytes)
return nil
}
// Creates queue stream if not exists
//
// Also create underlying nets-stream for queue and cancel-queue
func (n *NatsBroker) ConnectoQueue(q *Queue) error {
// create task-stream
if ok := n.isStreamExists(q.stream); !ok {
// if stream does not exist, create
if err := n.createStream(q.stream, q.subject, nats.WorkQueuePolicy); err != nil {
// failed to create task-stream
// todo
panic(err)
}
log.Printf("Created queue=%s", q.stream)
}
if ok := n.isStreamExists(q.cancelStream); !ok {
// create cancel stream for task-stream
if err := n.createStream(q.cancelStream, q.cancelSubject, nats.InterestPolicy); err != nil {
panic(err)
}
log.Printf("Created cancel queue=%s", q.stream)
}
return nil
}
// TODO: Allow users to specify `forceReRegister` as a boolean
// NewNatsBroker returns a new instance of NatsBroker.
func NewNatsBroker(conf NatsClientOpt, opt ClientOption, natsConnectionClosed chan struct{}, forceReRegister chan struct{}) (*NatsBroker, error) {
opt.NatsOption = append(opt.NatsOption,
nats.ReconnectWait(conf.ReconnectWait), nats.MaxReconnects(conf.MaxReconnects),
// nats.ReconnectWait(time.Second), nats.MaxReconnects(100),
natsDisconnectHandler(opt.ShutdownOnNatsDisconnect, natsConnectionClosed),
nats.ReconnectHandler(func(nc *nats.Conn) {
// TODO: Use a logger here
if !opt.ShutdownOnNatsDisconnect {
log.Println("reconnection found", nc.ConnectedUrl())
forceReRegister <- struct{}{}
}
}),
natsClosedHandler(opt.ShutdownOnNatsDisconnect, natsConnectionClosed),
)
nc, conErr := nats.Connect(conf.Addr,
opt.NatsOption...,
)
if conErr != nil {
return nil, conErr
} else {
if js, err := nc.JetStream(); err != nil {
return nil, err
} else {
return &NatsBroker{nc, js}, err
}
}
}