Skip to content

Commit 49c67d1

Browse files
committed
fix some bug
1 parent f336e5f commit 49c67d1

File tree

4 files changed

+14
-36
lines changed

4 files changed

+14
-36
lines changed

v2/websocket/api.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,29 +28,6 @@ func (c *Client) Send(ctx context.Context, msg interface{}) error {
2828
return socket.Asynchronous.Send(ctx, msg)
2929
}
3030

31-
// Submit a request to enable the given flag
32-
func (c *Client) EnableFlag(ctx context.Context, flag int) (string, error) {
33-
req := &FlagRequest{
34-
Event: "conf",
35-
Flags: flag,
36-
}
37-
// TODO enable flag on reconnect?
38-
// create sublist to stop concurrent map read
39-
socks := make([]*Socket, len(c.sockets))
40-
c.mtx.RLock()
41-
for i, socket := range c.sockets {
42-
socks[i] = socket
43-
}
44-
c.mtx.RUnlock()
45-
for _, socket := range socks {
46-
err := socket.Asynchronous.Send(ctx, req)
47-
if err != nil {
48-
return "", err
49-
}
50-
}
51-
return "", nil
52-
}
53-
5431
// Gen the count of currently active websocket connections
5532
func (c *Client) ConnectionCount() int {
5633
c.mtx.RLock()

v2/websocket/client.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -493,24 +493,29 @@ func (c *Client) checkResubscription(socketId SocketId) {
493493
if c.parameters.ManageOrderbook {
494494
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
495495
defer cancel()
496-
_, err_flag := c.EnableFlag(ctx, common.Checksum)
497-
if err_flag != nil {
498-
c.log.Errorf("could not enable checksum flag %s ", err_flag)
496+
req := &FlagRequest{
497+
Event: "conf",
498+
Flags: common.Checksum,
499+
}
500+
if err := socket.Asynchronous.Send(ctx, req); err != nil {
501+
c.log.Errorf("socket(%d) could not enable checksum flag %s ", socket.Id, err)
499502
}
500503
}
504+
501505
if c.parameters.ResubscribeOnReconnect && socket.ResetSubscriptions != nil {
506+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
507+
defer cancel()
502508
for _, sub := range socket.ResetSubscriptions {
503509
if sub.Request.Event == "auth" {
504510
continue
505511
}
506-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
507-
defer cancel()
508512
sub.Request.SubID = c.nonce.GetNonce() // new nonce
509513
c.log.Infof("socket (id=%d) resubscribing to %s with nonce %s", socket.Id, sub.Request.String(), sub.Request.SubID)
510514
_, err := c.subscribeBySocket(ctx, socket, sub.Request)
511515
if err != nil {
512516
c.log.Errorf("could not resubscribe: %s", err.Error())
513517
}
518+
time.Sleep(50 * time.Millisecond)
514519
}
515520
socket.ResetSubscriptions = nil
516521
}

v2/websocket/subscriptions.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,14 @@ func (s *subscriptions) sweep(exp time.Time) {
193193
s.lock.RUnlock()
194194
return
195195
}
196+
196197
disconnects := make([]HeartbeatDisconnect, 0)
197198
// use subsBySubID instead of subsByChanID to avoid ineffective heartbeat when re sub err on reconnect
198199
// since subsByChanID is empty when subscription err
199200
for _, sub := range s.subsBySubID {
200201
if exp.After(sub.hbDeadline) {
201-
s.hbActive = false
202+
// 22-01-13, do not change hbActive to false on heartbeat timeout, so we always heartbeat after first successful conn
203+
// s.hbActive = false
202204
hbErr := HeartbeatDisconnect{
203205
Subscription: sub,
204206
Error: fmt.Errorf("sub %v heartbeat disconnect on channel %d expired at %s (%s timeout)", sub.SubID(), sub.ChanID, sub.hbDeadline, s.hbTimeout),

v2/websocket/transport.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"crypto/tls"
66
"encoding/json"
77
"fmt"
8-
"net"
98
"net/http"
109
"sync"
1110
"sync/atomic"
@@ -182,16 +181,11 @@ func (w *ws) listenWs() {
182181
_, msg, err := w.ws.ReadMessage()
183182
if err != nil {
184183
w.log.Errorf("%s ws read err: %s", w.connStr, err.Error())
185-
// a read during normal shutdown results in an OpError: op on closed connection
186-
if _, ok := err.(*net.OpError); ok {
187-
// general read error on a closed network connection, OK
188-
return
189-
}
190-
191184
w.stop(err)
192185
return
193186
}
194187
w.log.Debugf("%s srv->ws: %s", w.connStr, string(msg))
188+
195189
w.lock.RLock()
196190
if w.downstream == nil {
197191
w.lock.RUnlock()

0 commit comments

Comments
 (0)