Skip to content

Commit c3c5af9

Browse files
authored
Merge pull request nsqio#1198 from ploxiln/nsqd_exit_tcp_closeall
nsqd: fix stall in Exit() due to tcp producer conns
2 parents ac1627b + 5f2153f commit c3c5af9

File tree

2 files changed

+23
-4
lines changed

2 files changed

+23
-4
lines changed

nsqd/nsqd.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type NSQD struct {
6161

6262
lookupPeers atomic.Value
6363

64+
tcpServer *tcpServer
6465
tcpListener net.Listener
6566
httpListener net.Listener
6667
httpsListener net.Listener
@@ -154,6 +155,7 @@ func New(opts *Options) (*NSQD, error) {
154155
n.logf(LOG_INFO, version.String("nsqd"))
155156
n.logf(LOG_INFO, "ID: %d", opts.ID)
156157

158+
n.tcpServer = &tcpServer{}
157159
n.tcpListener, err = net.Listen("tcp", opts.TCPAddress)
158160
if err != nil {
159161
return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err)
@@ -255,14 +257,16 @@ func (n *NSQD) Main() error {
255257
})
256258
}
257259

258-
tcpServer := &tcpServer{ctx: ctx}
260+
n.tcpServer.ctx = ctx
259261
n.waitGroup.Wrap(func() {
260-
exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
262+
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
261263
})
264+
262265
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
263266
n.waitGroup.Wrap(func() {
264267
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
265268
})
269+
266270
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
267271
httpsServer := newHTTPServer(ctx, true, true)
268272
n.waitGroup.Wrap(func() {
@@ -423,6 +427,9 @@ func (n *NSQD) Exit() {
423427
if n.tcpListener != nil {
424428
n.tcpListener.Close()
425429
}
430+
if n.tcpServer != nil {
431+
n.tcpServer.CloseAll()
432+
}
426433

427434
if n.httpListener != nil {
428435
n.httpListener.Close()

nsqd/tcp.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package nsqd
33
import (
44
"io"
55
"net"
6+
"sync"
67

78
"github.com/nsqio/nsq/internal/protocol"
89
)
910

1011
type tcpServer struct {
11-
ctx *context
12+
ctx *context
13+
conns sync.Map
1214
}
1315

1416
func (p *tcpServer) Handle(clientConn net.Conn) {
@@ -41,9 +43,19 @@ func (p *tcpServer) Handle(clientConn net.Conn) {
4143
return
4244
}
4345

46+
p.conns.Store(clientConn.RemoteAddr(), clientConn)
47+
4448
err = prot.IOLoop(clientConn)
4549
if err != nil {
4650
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
47-
return
4851
}
52+
53+
p.conns.Delete(clientConn.RemoteAddr())
54+
}
55+
56+
func (p *tcpServer) CloseAll() {
57+
p.conns.Range(func(k, v interface{}) bool {
58+
v.(net.Conn).Close()
59+
return true
60+
})
4961
}

0 commit comments

Comments
 (0)