Skip to content

Commit 0054d3d

Browse files
committed
nsqd: buffer and spread UDP writes
1 parent bd08d63 commit 0054d3d

File tree

7 files changed

+102
-42
lines changed

7 files changed

+102
-42
lines changed

apps/nsqd/nsqd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
123123
flagSet.Duration("statsd-interval", opts.StatsdInterval, "duration between pushing to statsd")
124124
flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd")
125125
flagSet.String("statsd-prefix", opts.StatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)")
126+
flagSet.Int("statsd-udp-packet-size", opts.StatsdUDPPacketSize, "the size in bytes of statsd UDP packets")
126127

127128
// End to end percentile flags
128129
e2eProcessingLatencyPercentiles := app.FloatArray{}

contrib/nsqd.cfg.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ statsd_interval = "60s"
8484
## toggle sending memory and GC stats to statsd
8585
statsd_mem_stats = true
8686

87+
## the size in bytes of statsd UDP packets
88+
# statsd_udp_packet_size = 508
89+
8790

8891
## message processing time percentiles to keep track of (float)
8992
e2e_processing_latency_percentiles = [

internal/statsd/client.go

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,22 @@
11
package statsd
22

33
import (
4-
"errors"
54
"fmt"
6-
"net"
7-
"time"
5+
"io"
86
)
97

108
type Client struct {
11-
conn net.Conn
12-
addr string
9+
w io.Writer
1310
prefix string
1411
}
1512

16-
func NewClient(addr string, prefix string) *Client {
13+
func NewClient(w io.Writer, prefix string) *Client {
1714
return &Client{
18-
addr: addr,
15+
w: w,
1916
prefix: prefix,
2017
}
2118
}
2219

23-
func (c *Client) String() string {
24-
return c.addr
25-
}
26-
27-
func (c *Client) CreateSocket() error {
28-
conn, err := net.DialTimeout("udp", c.addr, time.Second)
29-
if err != nil {
30-
return err
31-
}
32-
c.conn = conn
33-
return nil
34-
}
35-
36-
func (c *Client) Close() error {
37-
return c.conn.Close()
38-
}
39-
4020
func (c *Client) Incr(stat string, count int64) error {
4121
return c.send(stat, "%d|c", count)
4222
}
@@ -54,10 +34,7 @@ func (c *Client) Gauge(stat string, value int64) error {
5434
}
5535

5636
func (c *Client) send(stat string, format string, value int64) error {
57-
if c.conn == nil {
58-
return errors.New("not connected")
59-
}
60-
format = fmt.Sprintf("%s%s:%s", c.prefix, stat, format)
61-
_, err := fmt.Fprintf(c.conn, format, value)
37+
format = fmt.Sprintf("%s%s:%s\n", c.prefix, stat, format)
38+
_, err := fmt.Fprintf(c.w, format, value)
6239
return err
6340
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package writers
2+
3+
import (
4+
"bufio"
5+
"io"
6+
)
7+
8+
type BoundaryBufferedWriter struct {
9+
bw *bufio.Writer
10+
}
11+
12+
func NewBoundaryBufferedWriter(w io.Writer, size int) *BoundaryBufferedWriter {
13+
return &BoundaryBufferedWriter{
14+
bw: bufio.NewWriterSize(w, size),
15+
}
16+
}
17+
18+
func (b *BoundaryBufferedWriter) Write(p []byte) (int, error) {
19+
if len(p) > b.bw.Available() {
20+
err := b.bw.Flush()
21+
if err != nil {
22+
return 0, err
23+
}
24+
}
25+
return b.bw.Write(p)
26+
}
27+
28+
func (b *BoundaryBufferedWriter) Flush() error {
29+
return b.bw.Flush()
30+
}

internal/writers/spread_writer.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package writers
2+
3+
import (
4+
"io"
5+
"time"
6+
)
7+
8+
type SpreadWriter struct {
9+
w io.Writer
10+
interval time.Duration
11+
buf [][]byte
12+
}
13+
14+
func NewSpreadWriter(w io.Writer, interval time.Duration) *SpreadWriter {
15+
return &SpreadWriter{
16+
w: w,
17+
interval: interval,
18+
buf: make([][]byte, 0),
19+
}
20+
}
21+
22+
func (s *SpreadWriter) Write(p []byte) (int, error) {
23+
b := make([]byte, len(p))
24+
copy(b, p)
25+
s.buf = append(s.buf, b)
26+
return len(p), nil
27+
}
28+
29+
func (s *SpreadWriter) Flush() {
30+
sleep := s.interval / time.Duration(len(s.buf))
31+
for _, b := range s.buf {
32+
start := time.Now()
33+
s.w.Write(b)
34+
latency := time.Now().Sub(start)
35+
time.Sleep(sleep - latency)
36+
}
37+
s.buf = s.buf[:0]
38+
}

nsqd/options.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,11 @@ type Options struct {
5858
MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"`
5959

6060
// statsd integration
61-
StatsdAddress string `flag:"statsd-address"`
62-
StatsdPrefix string `flag:"statsd-prefix"`
63-
StatsdInterval time.Duration `flag:"statsd-interval"`
64-
StatsdMemStats bool `flag:"statsd-mem-stats"`
61+
StatsdAddress string `flag:"statsd-address"`
62+
StatsdPrefix string `flag:"statsd-prefix"`
63+
StatsdInterval time.Duration `flag:"statsd-interval"`
64+
StatsdMemStats bool `flag:"statsd-mem-stats"`
65+
StatsdUDPPacketSize int `flag:"statsd-udp-packet-size"`
6566

6667
// e2e message latency
6768
E2EProcessingLatencyWindowTime time.Duration `flag:"e2e-processing-latency-window-time"`
@@ -130,9 +131,10 @@ func NewOptions() *Options {
130131
MaxOutputBufferSize: 64 * 1024,
131132
MaxOutputBufferTimeout: 1 * time.Second,
132133

133-
StatsdPrefix: "nsq.%s",
134-
StatsdInterval: 60 * time.Second,
135-
StatsdMemStats: true,
134+
StatsdPrefix: "nsq.%s",
135+
StatsdInterval: 60 * time.Second,
136+
StatsdMemStats: true,
137+
StatsdUDPPacketSize: 508,
136138

137139
E2EProcessingLatencyWindowTime: time.Duration(10 * time.Minute),
138140

nsqd/statsd.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package nsqd
33
import (
44
"fmt"
55
"math"
6+
"net"
67
"time"
78

89
"github.com/nsqio/nsq/internal/statsd"
10+
"github.com/nsqio/nsq/internal/writers"
911
)
1012

1113
type Uint64Slice []uint64
@@ -25,20 +27,25 @@ func (s Uint64Slice) Less(i, j int) bool {
2527
func (n *NSQD) statsdLoop() {
2628
var lastMemStats memStats
2729
var lastStats []TopicStats
28-
ticker := time.NewTicker(n.getOpts().StatsdInterval)
30+
interval := n.getOpts().StatsdInterval
31+
ticker := time.NewTicker(interval)
2932
for {
3033
select {
3134
case <-n.exitChan:
3235
goto exit
3336
case <-ticker.C:
34-
client := statsd.NewClient(n.getOpts().StatsdAddress, n.getOpts().StatsdPrefix)
35-
err := client.CreateSocket()
37+
addr := n.getOpts().StatsdAddress
38+
prefix := n.getOpts().StatsdPrefix
39+
conn, err := net.DialTimeout("udp", addr, time.Second)
3640
if err != nil {
37-
n.logf(LOG_ERROR, "failed to create UDP socket to statsd(%s)", client)
41+
n.logf(LOG_ERROR, "failed to create UDP socket to statsd(%s)", addr)
3842
continue
3943
}
44+
sw := writers.NewSpreadWriter(conn, interval-time.Second)
45+
bw := writers.NewBoundaryBufferedWriter(sw, n.getOpts().StatsdUDPPacketSize)
46+
client := statsd.NewClient(bw, prefix)
4047

41-
n.logf(LOG_INFO, "STATSD: pushing stats to %s", client)
48+
n.logf(LOG_INFO, "STATSD: pushing stats to %s", addr)
4249

4350
stats := n.GetStats("", "")
4451
for _, topic := range stats {
@@ -128,7 +135,9 @@ func (n *NSQD) statsdLoop() {
128135
lastMemStats = ms
129136
}
130137

131-
client.Close()
138+
bw.Flush()
139+
sw.Flush()
140+
conn.Close()
132141
}
133142
}
134143

0 commit comments

Comments
 (0)