-
Notifications
You must be signed in to change notification settings - Fork 0
/
natsby.go
115 lines (94 loc) · 2.47 KB
/
natsby.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package natsby
import (
"io"
"os"
"github.com/nats-io/nats.go"
)
// HandlerFunc defines handler used by middleware as return value
type HandlerFunc func(*Context)
// HandlersChain HandlerFunc array
type HandlersChain []HandlerFunc
// Subscriber respresents a subscriber to be set up in Run()
type Subscriber struct {
Subject string
Handlers HandlersChain
}
// Engine framework instance
type Engine struct {
*nats.Conn
*nats.EncodedConn
subscribers []*Subscriber
middleware HandlersChain
done chan bool
QueueGroup string
OutWriter io.ReadWriter
ErrWriter io.ReadWriter
}
// New creates a new Router object
func New(nc *nats.Conn, options ...func(*Engine) error) (*Engine, error) {
e := &Engine{
done: make(chan bool),
}
var err error
e.OutWriter = os.Stdout
e.ErrWriter = os.Stderr
e.Conn = nc
for _, option := range options {
err = option(e)
}
return e, err
}
// Use adds global middleware to the engine which will be called for every subscription
func (e *Engine) Use(middleware ...HandlerFunc) {
e.middleware = append(e.middleware, middleware...)
}
// Subscribe adds a subscriber to the NATS instance with middleware
func (e *Engine) Subscribe(subject string, handlers ...HandlerFunc) {
s := &Subscriber{
Subject: subject,
Handlers: e.combineHandlers(handlers),
}
e.subscribers = append(e.subscribers, s)
}
func (e *Engine) combineHandlers(handlers HandlersChain) HandlersChain {
finalSize := len(e.middleware) + len(handlers)
mergedHandlers := make(HandlersChain, finalSize)
copy(mergedHandlers, e.middleware)
copy(mergedHandlers[len(e.middleware):], handlers)
return mergedHandlers
}
// Run starts all the subscribers and blocks
func (e *Engine) Run(callbacks ...func()) error {
for _, subscriber := range e.subscribers {
func(subscriber *Subscriber) {
handler := func(m *nats.Msg) {
c := &Context{
Msg: m,
handlers: subscriber.Handlers,
Conn: e.Conn,
EncodedConn: e.EncodedConn,
Keys: make(map[string]interface{}),
outWriter: e.OutWriter,
errWriter: e.ErrWriter,
}
c.reset()
c.Next()
}
if e.QueueGroup == "" {
e.Conn.Subscribe(subscriber.Subject, handler)
return
}
e.Conn.QueueSubscribe(subscriber.Subject, e.QueueGroup, handler)
}(subscriber)
}
for _, cb := range callbacks {
cb()
}
<-e.done
e.Conn.Drain()
return nil
}
// Shutdown terminates all listeners and drains connections
func (e *Engine) Shutdown() {
e.done <- true
}