Skip to content

Commit 84ea8f6

Browse files
committed
Add support of unix sockets for tcp, http, https listeners
- Using -1 for tcp/http port for /info endpoint - Disable ability using https for unix sockets
1 parent cf12b96 commit 84ea8f6

File tree

13 files changed

+1886
-31
lines changed

13 files changed

+1886
-31
lines changed

apps/nsqd/options.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,9 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
129129
flagSet.Bool("worker-id", false, "[deprecated] use --node-id")
130130

131131
flagSet.String("https-address", opts.HTTPSAddress, "<addr>:<port> to listen on for HTTPS clients")
132-
flagSet.String("http-address", opts.HTTPAddress, "<addr>:<port> to listen on for HTTP clients")
133-
flagSet.String("tcp-address", opts.TCPAddress, "<addr>:<port> to listen on for TCP clients")
132+
flagSet.String("http-address", opts.HTTPAddress, "address to listen on for HTTP clients (<addr>:<port> for TCP/IP or <path> for unix socket)")
133+
flagSet.String("tcp-address", opts.TCPAddress, "address to listen on for TCP clients (<addr>:<port> for TCP/IP or <path> for unix socket)")
134+
134135
authHTTPAddresses := app.StringArray{}
135136
flagSet.Var(&authHTTPAddresses, "auth-http-address", "<addr>:<port> or a full url to query auth server (may be given multiple times)")
136137
flagSet.String("broadcast-address", opts.BroadcastAddress, "address that will be registered with lookupd (defaults to the OS hostname)")

internal/util/unix_socket.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package util
2+
3+
import (
4+
"net"
5+
)
6+
7+
func TypeOfAddr(addr string) string {
8+
if _, _, err := net.SplitHostPort(addr); err == nil {
9+
return "tcp"
10+
}
11+
return "unix"
12+
}

internal/util/util_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,18 @@ func TestUniqRands(t *testing.T) {
3434
x = UniqRands(10, 20)
3535
test.Equal(t, 10, len(x))
3636
}
37+
38+
func TestTypeOfAddr(t *testing.T) {
39+
var x string
40+
x = TypeOfAddr("127.0.0.1:80")
41+
test.Equal(t, "tcp", x)
42+
43+
x = TypeOfAddr("test:80")
44+
test.Equal(t, "tcp", x)
45+
46+
x = TypeOfAddr("/var/run/nsqd.sock")
47+
test.Equal(t, "unix", x)
48+
49+
x = TypeOfAddr("[::1%lo0]:80")
50+
test.Equal(t, "tcp", x)
51+
}

nsqadmin/http_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ func TestHTTPNodesGET(t *testing.T) {
239239
testNode := ns.Nodes[0]
240240
test.Equal(t, hostname, testNode.Hostname)
241241
test.Equal(t, "127.0.0.1", testNode.BroadcastAddress)
242-
test.Equal(t, nsqds[0].RealTCPAddr().Port, testNode.TCPPort)
243-
test.Equal(t, nsqds[0].RealHTTPAddr().Port, testNode.HTTPPort)
242+
test.Equal(t, nsqds[0].RealTCPAddr().(*net.TCPAddr).Port, testNode.TCPPort)
243+
test.Equal(t, nsqds[0].RealHTTPAddr().(*net.TCPAddr).Port, testNode.HTTPPort)
244244
test.Equal(t, version.Binary, testNode.Version)
245245
test.Equal(t, 0, len(testNode.Topics))
246246
}

nsqadmin/nsqadmin_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func TestTLSHTTPClient(t *testing.T) {
7777
test.Equal(t, resp.StatusCode < 500, true)
7878
}
7979

80-
func mustStartNSQD(opts *nsqd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqd.NSQD) {
80+
func mustStartNSQD(opts *nsqd.Options) (net.Addr, net.Addr, *nsqd.NSQD) {
8181
opts.TCPAddress = "127.0.0.1:0"
8282
opts.HTTPAddress = "127.0.0.1:0"
8383
opts.HTTPSAddress = "127.0.0.1:0"

nsqd/client_v2.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -637,9 +637,13 @@ func (c *clientV2) Flush() error {
637637
}
638638

639639
func (c *clientV2) QueryAuthd() error {
640-
remoteIP, _, err := net.SplitHostPort(c.String())
641-
if err != nil {
642-
return err
640+
remoteIP := ""
641+
if c.RemoteAddr().Network() == "tcp" {
642+
ip, _, err := net.SplitHostPort(c.String())
643+
if err != nil {
644+
return err
645+
}
646+
remoteIP = ip
643647
}
644648

645649
tlsEnabled := atomic.LoadInt32(&c.TLS) == 1

nsqd/http.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"fmt"
88
"io"
9+
"net"
910
"net/http"
1011
"net/http/pprof"
1112
"net/url"
@@ -131,6 +132,15 @@ func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou
131132
if err != nil {
132133
return nil, http_api.Err{500, err.Error()}
133134
}
135+
tcpPort := -1 // in case of unix socket
136+
if s.nsqd.RealTCPAddr().Network() == "tcp" {
137+
tcpPort = s.nsqd.RealTCPAddr().(*net.TCPAddr).Port
138+
}
139+
httpPort := -1 // in case of unix socket
140+
if s.nsqd.RealHTTPAddr().Network() == "tcp" {
141+
httpPort = s.nsqd.RealHTTPAddr().(*net.TCPAddr).Port
142+
}
143+
134144
return struct {
135145
Version string `json:"version"`
136146
BroadcastAddress string `json:"broadcast_address"`
@@ -146,8 +156,8 @@ func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou
146156
Version: version.Binary,
147157
BroadcastAddress: s.nsqd.getOpts().BroadcastAddress,
148158
Hostname: hostname,
149-
TCPPort: s.nsqd.RealTCPAddr().Port,
150-
HTTPPort: s.nsqd.RealHTTPAddr().Port,
159+
TCPPort: tcpPort,
160+
HTTPPort: httpPort,
151161
StartTime: s.nsqd.GetStartTime().Unix(),
152162
MaxHeartBeatInterval: s.nsqd.getOpts().MaxHeartbeatInterval,
153163
MaxOutBufferSize: s.nsqd.getOpts().MaxOutputBufferSize,

nsqd/http_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -527,36 +527,36 @@ func TestHTTPClientStats(t *testing.T) {
527527
Memory *struct{} `json:"memory,omitempty"`
528528
}
529529

530-
endpoint := fmt.Sprintf("http://127.0.0.1:%d/stats?format=json", httpAddr.Port)
530+
endpoint := fmt.Sprintf("http://%s/stats?format=json", httpAddr)
531531
err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
532532
test.Nil(t, err)
533533

534534
test.Equal(t, 1, len(d.Topics[0].Channels[0].Clients))
535535
test.Equal(t, 1, d.Topics[0].Channels[0].ClientCount)
536536
test.NotNil(t, d.Memory)
537537

538-
endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_clients=true", httpAddr.Port)
538+
endpoint = fmt.Sprintf("http://%s/stats?format=json&include_clients=true", httpAddr)
539539
err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
540540
test.Nil(t, err)
541541

542542
test.Equal(t, 1, len(d.Topics[0].Channels[0].Clients))
543543
test.Equal(t, 1, d.Topics[0].Channels[0].ClientCount)
544544

545-
endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_clients=false", httpAddr.Port)
545+
endpoint = fmt.Sprintf("http://%s/stats?format=json&include_clients=false", httpAddr)
546546
err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
547547
test.Nil(t, err)
548548

549549
test.Equal(t, 0, len(d.Topics[0].Channels[0].Clients))
550550
test.Equal(t, 1, d.Topics[0].Channels[0].ClientCount)
551551

552-
endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_mem=true", httpAddr.Port)
552+
endpoint = fmt.Sprintf("http://%s/stats?format=json&include_mem=true", httpAddr)
553553
err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
554554
test.Nil(t, err)
555555

556556
test.NotNil(t, d.Memory)
557557

558558
d.Memory = nil
559-
endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_mem=false", httpAddr.Port)
559+
endpoint = fmt.Sprintf("http://%s/stats?format=json&include_mem=false", httpAddr)
560560
err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
561561
test.Nil(t, err)
562562

nsqd/nsqd.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,12 @@ func New(opts *Options) (*NSQD, error) {
138138
n.logf(LOG_INFO, "ID: %d", opts.ID)
139139

140140
n.tcpServer = &tcpServer{nsqd: n}
141-
n.tcpListener, err = net.Listen("tcp", opts.TCPAddress)
141+
n.tcpListener, err = net.Listen(util.TypeOfAddr(opts.TCPAddress), opts.TCPAddress)
142142
if err != nil {
143143
return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err)
144144
}
145145
if opts.HTTPAddress != "" {
146-
n.httpListener, err = net.Listen("tcp", opts.HTTPAddress)
146+
n.httpListener, err = net.Listen(util.TypeOfAddr(opts.HTTPAddress), opts.HTTPAddress)
147147
if err != nil {
148148
return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddress, err)
149149
}
@@ -155,11 +155,17 @@ func New(opts *Options) (*NSQD, error) {
155155
}
156156
}
157157
if opts.BroadcastHTTPPort == 0 {
158-
opts.BroadcastHTTPPort = n.RealHTTPAddr().Port
158+
tcpAddr, ok := n.RealHTTPAddr().(*net.TCPAddr)
159+
if ok {
160+
opts.BroadcastHTTPPort = tcpAddr.Port
161+
}
159162
}
160163

161164
if opts.BroadcastTCPPort == 0 {
162-
opts.BroadcastTCPPort = n.RealTCPAddr().Port
165+
tcpAddr, ok := n.RealTCPAddr().(*net.TCPAddr)
166+
if ok {
167+
opts.BroadcastTCPPort = tcpAddr.Port
168+
}
163169
}
164170

165171
if opts.StatsdPrefix != "" {
@@ -190,19 +196,19 @@ func (n *NSQD) triggerOptsNotification() {
190196
}
191197
}
192198

193-
func (n *NSQD) RealTCPAddr() *net.TCPAddr {
199+
func (n *NSQD) RealTCPAddr() net.Addr {
194200
if n.tcpListener == nil {
195201
return &net.TCPAddr{}
196202
}
197-
return n.tcpListener.Addr().(*net.TCPAddr)
203+
return n.tcpListener.Addr()
198204

199205
}
200206

201-
func (n *NSQD) RealHTTPAddr() *net.TCPAddr {
207+
func (n *NSQD) RealHTTPAddr() net.Addr {
202208
if n.httpListener == nil {
203209
return &net.TCPAddr{}
204210
}
205-
return n.httpListener.Addr().(*net.TCPAddr)
211+
return n.httpListener.Addr()
206212
}
207213

208214
func (n *NSQD) RealHTTPSAddr() *net.TCPAddr {

nsqd/nsqd_test.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7+
"io/fs"
78
"net"
89
"os"
910
"strconv"
@@ -239,7 +240,7 @@ func TestPauseMetadata(t *testing.T) {
239240
test.Equal(t, false, isPaused(nsqd, 0, 0))
240241
}
241242

242-
func mustStartNSQLookupd(opts *nsqlookupd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqlookupd.NSQLookupd) {
243+
func mustStartNSQLookupd(opts *nsqlookupd.Options) (net.Addr, net.Addr, *nsqlookupd.NSQLookupd) {
243244
opts.TCPAddress = "127.0.0.1:0"
244245
opts.HTTPAddress = "127.0.0.1:0"
245246
lookupd, err := nsqlookupd.New(opts)
@@ -361,15 +362,15 @@ func TestCluster(t *testing.T) {
361362

362363
test.Equal(t, hostname, topicData[0].Hostname)
363364
test.Equal(t, "127.0.0.1", topicData[0].BroadcastAddress)
364-
test.Equal(t, nsqd.RealTCPAddr().Port, topicData[0].TCPPort)
365+
test.Equal(t, nsqd.RealTCPAddr().(*net.TCPAddr).Port, topicData[0].TCPPort)
365366
test.Equal(t, false, topicData[0].Tombstoned)
366367

367368
channelData := d["channel:"+topicName+":ch"]
368369
test.Equal(t, 1, len(channelData))
369370

370371
test.Equal(t, hostname, channelData[0].Hostname)
371372
test.Equal(t, "127.0.0.1", channelData[0].BroadcastAddress)
372-
test.Equal(t, nsqd.RealTCPAddr().Port, channelData[0].TCPPort)
373+
test.Equal(t, nsqd.RealTCPAddr().(*net.TCPAddr).Port, channelData[0].TCPPort)
373374
test.Equal(t, false, channelData[0].Tombstoned)
374375

375376
var lr struct {
@@ -388,7 +389,7 @@ func TestCluster(t *testing.T) {
388389
test.Equal(t, 1, len(lr.Producers))
389390
test.Equal(t, hostname, lr.Producers[0].Hostname)
390391
test.Equal(t, "127.0.0.1", lr.Producers[0].BroadcastAddress)
391-
test.Equal(t, nsqd.RealTCPAddr().Port, lr.Producers[0].TCPPort)
392+
test.Equal(t, nsqd.RealTCPAddr().(*net.TCPAddr).Port, lr.Producers[0].TCPPort)
392393
test.Equal(t, 1, len(lr.Channels))
393394
test.Equal(t, "ch", lr.Channels[0])
394395

@@ -438,3 +439,23 @@ func TestSetHealth(t *testing.T) {
438439
test.Equal(t, "OK", nsqd.GetHealth())
439440
test.Equal(t, true, nsqd.IsHealthy())
440441
}
442+
443+
func TestUnixSocketStartup(t *testing.T) {
444+
isSocket := func(path string) bool {
445+
fileInfo, err := os.Stat(path)
446+
if err != nil {
447+
return false
448+
}
449+
return fileInfo.Mode().Type() == fs.ModeSocket
450+
}
451+
452+
opts := NewOptions()
453+
opts.Logger = test.NewTestLogger(t)
454+
455+
_, _, nsqd := mustUnixSocketStartNSQD(opts)
456+
defer os.RemoveAll(opts.DataPath)
457+
defer nsqd.Exit()
458+
459+
test.Equal(t, isSocket(opts.TCPAddress), true)
460+
test.Equal(t, isSocket(opts.HTTPAddress), true)
461+
}

0 commit comments

Comments
 (0)