-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
143 lines (122 loc) · 2.89 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"math/rand"
"os"
"strconv"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
flg := parseFlags()
natsConn, err := nats.Connect(flg.url, nats.Timeout(20*time.Second))
if err != nil {
fmt.Printf("Can't connect Nats (%s).\n", err)
return
}
stanConn, err := stan.Connect(flg.clusterID, strconv.Itoa(rand.Int()), stan.NatsConn(natsConn))
if err != nil {
fmt.Printf("Can't connect STAN (%s).\n", err)
return
}
fmt.Printf("Connected to STAN %q cluster via %q URL.\n", flg.clusterID, flg.url)
switch flg.action {
case pubAction:
if err := pub(flg.subject, stanConn); err != nil {
fmt.Printf("Can't publish message (%s).\n", err)
return
}
case subAction:
if err := sub(flg.subject, stanConn); err != nil {
fmt.Printf("Can't subscribe subject (%s).\n", err)
return
}
}
}
func pub(subject string, stanConn stan.Conn) error {
fmt.Println("Type or paste message here and then hit Return/Enter and Ctrl-D")
payload, err := io.ReadAll(os.Stdin)
if err != nil {
fmt.Printf("Can't read from std input (%s).\n", err)
return err
}
fmt.Println("Publishing...")
if err := stanConn.Publish(subject, payload); err != nil {
fmt.Printf("Can't publish message (%s).\n", err)
return err
}
fmt.Println("Published")
return nil
}
func sub(subject string, stanConn stan.Conn) error {
const group = "local"
if _, err := stanConn.QueueSubscribe(subject, group, handle, stan.SetManualAckMode()); err != nil {
fmt.Printf("Can't subscribe stan (%s).\n", err)
return err
}
<-make(chan struct{})
return nil
}
func handle(msg *stan.Msg) {
defer func() {
_ = msg.Ack()
}()
var buf bytes.Buffer
if err := json.Indent(&buf, msg.Data, "", " "); err != nil {
fmt.Printf("%s.\n", msg.Data)
return
}
fmt.Println(buf.String())
}
const (
pubAction = "pub"
subAction = "sub"
)
type flags struct {
url string
clusterID string
action string
subject string
}
func parseFlags() flags {
var (
url = flag.String("url", "nats://0.0.0.0:4222", "Nats URL")
clusterID = flag.String("cluster-id", "test-cluster", "Nats cluster ID")
pub = flag.Bool("pub", false, "Publish action")
sub = flag.Bool("sub", false, "Subscribe action")
subject = flag.String("subject", "", "Nats subject")
)
flag.Parse()
var action string
if *pub {
action = pubAction
} else if *sub {
action = subAction
} else {
fmt.Println(`Specify "pub" or "sub" action using
--pub or --sub flag respectively.
For expample:
./stancli --pub --subject=abc`)
os.Exit(1)
}
if *subject == "" {
fmt.Println(`Specify subject (or topic) using --subject flag.
For expample:
./stancli --pub --subject=abc`)
os.Exit(1)
}
return flags{
url: *url,
clusterID: *clusterID,
action: action,
subject: *subject,
}
}