Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

binance: update book stream url #3015

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions client/comms/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type WsConn interface {
RequestWithTimeout(msg *msgjson.Message, respHandler func(*msgjson.Message), expireTime time.Duration, expire func()) error
Connect(ctx context.Context) (*sync.WaitGroup, error)
MessageSource() <-chan *msgjson.Message
UpdateURL(string)
}

// When the DEX sends a request to the client, a responseHandler is created
Expand Down Expand Up @@ -161,6 +162,7 @@ type wsConn struct {
cfg *WsCfg
tlsCfg *tls.Config
readCh chan *msgjson.Message
urlV atomic.Value // string

wsMtx sync.Mutex
ws *websocket.Conn
Expand Down Expand Up @@ -203,14 +205,25 @@ func NewWsConn(cfg *WsCfg) (WsConn, error) {
ServerName: uri.Hostname(),
}

return &wsConn{
conn := &wsConn{
cfg: cfg,
log: cfg.Logger,
tlsCfg: tlsConfig,
readCh: make(chan *msgjson.Message, readBuffSize),
respHandlers: make(map[uint64]*responseHandler),
reconnectCh: make(chan struct{}, 1),
}, nil
}
conn.urlV.Store(cfg.URL)

return conn, nil
}

func (conn *wsConn) UpdateURL(uri string) {
conn.urlV.Store(uri)
}

func (conn *wsConn) url() string {
return conn.urlV.Load().(string)
}

// IsDown indicates if the connection is known to be down.
Expand Down Expand Up @@ -240,7 +253,7 @@ func (conn *wsConn) connect(ctx context.Context) error {
dialer.Proxy = http.ProxyFromEnvironment
}

ws, _, err := dialer.DialContext(ctx, conn.cfg.URL, conn.cfg.ConnectHeaders)
ws, _, err := dialer.DialContext(ctx, conn.url(), conn.cfg.ConnectHeaders)
if err != nil {
if isErrorInvalidCert(err) {
conn.setConnectionStatus(InvalidCert)
Expand Down Expand Up @@ -331,7 +344,7 @@ func (conn *wsConn) handleReadError(err error) {

var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
conn.log.Errorf("Read timeout on connection to %s.", conn.cfg.URL)
conn.log.Errorf("Read timeout on connection to %s.", conn.url())
reconnect()
return
}
Expand Down Expand Up @@ -457,11 +470,11 @@ func (conn *wsConn) keepAlive(ctx context.Context) {
return
}

conn.log.Infof("Attempting to reconnect to %s...", conn.cfg.URL)
conn.log.Infof("Attempting to reconnect to %s...", conn.url())
err := conn.connect(ctx)
if err != nil {
conn.log.Errorf("Reconnect failed. Scheduling reconnect to %s in %.1f seconds.",
conn.cfg.URL, rcInt.Seconds())
conn.url(), rcInt.Seconds())
time.AfterFunc(rcInt, func() {
conn.reconnectCh <- struct{}{}
})
Expand Down
2 changes: 2 additions & 0 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ func (conn *TWebsocket) Connect(context.Context) (*sync.WaitGroup, error) {
return &sync.WaitGroup{}, conn.connectErr
}

func (conn *TWebsocket) UpdateURL(string) {}

type TDB struct {
updateWalletErr error
acct *db.AccountInfo
Expand Down
32 changes: 18 additions & 14 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -1618,7 +1618,7 @@ func (bnc *binance) getOrderbookSnapshot(ctx context.Context, mktSymbol string)
// subscribeToAdditionalMarketDataStream is called when a new market is
// subscribed to after the market data stream connection has already been
// established.
func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, baseID, quoteID uint32) error {
func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, baseID, quoteID uint32) (err error) {
baseCfg, quoteCfg, err := bncAssetCfgs(baseID, quoteID)
if err != nil {
return fmt.Errorf("error getting asset cfg for %d: %w", baseID, err)
Expand All @@ -1627,6 +1627,10 @@ func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, b
mktID := binanceMktID(baseCfg, quoteCfg)
streamID := marketDataStreamID(mktID)

defer func() {
bnc.marketStream.UpdateURL(bnc.streamURL())
}()

bnc.booksMtx.Lock()
defer bnc.booksMtx.Unlock()

Expand Down Expand Up @@ -1662,6 +1666,10 @@ func (bnc *binance) streams() []string {
return streamNames
}

func (bnc *binance) streamURL() string {
return fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/"))
}

// checkSubs will query binance for current market subscriptions and compare
// that to what subscriptions we should have. If there is a discrepancy a
// warning is logged and the market subbed or unsubbed.
Expand Down Expand Up @@ -1756,8 +1764,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
reconnectC := make(chan struct{})
checkSubsC := make(chan struct{})

newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) {
addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/"))
newConnection := func() (*dex.ConnectionMaster, error) {
// Need to send key but not signature
connectEventFunc := func(cs comms.ConnectionStatus) {
if cs != comms.Disconnected && cs != comms.Connected {
Expand All @@ -1776,7 +1783,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
}
}
conn, err := comms.NewWsConn(&comms.WsCfg{
URL: addr,
URL: bnc.streamURL(),
// Binance Docs: The websocket server will send a ping frame every 3
// minutes. If the websocket server does not receive a pong frame
// back from the connection within a 10 minute period, the connection
Expand All @@ -1795,16 +1802,16 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
RawHandler: bnc.handleMarketDataNote,
})
if err != nil {
return nil, nil, err
return nil, err
}

bnc.marketStream = conn
cm := dex.NewConnectionMaster(conn)
if err = cm.ConnectOnce(ctx); err != nil {
return nil, nil, fmt.Errorf("websocketHandler remote connect: %v", err)
return nil, fmt.Errorf("websocketHandler remote connect: %v", err)
}

return conn, cm, nil
return cm, nil
}

// Add the initial book to the books map
Expand All @@ -1822,32 +1829,27 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
bnc.booksMtx.Unlock()

// Create initial connection to the market data stream
conn, cm, err := newConnection()
cm, err := newConnection()
if err != nil {
return fmt.Errorf("error connecting to market data stream : %v", err)
}

bnc.marketStream = conn

book.sync(ctx)

// Start a goroutine to reconnect every 12 hours
go func() {
reconnect := func() error {
bnc.marketStreamMtx.Lock()
defer bnc.marketStreamMtx.Unlock()

oldCm := cm
conn, cm, err = newConnection()
cm, err = newConnection()
if err != nil {
return err
}

if oldCm != nil {
oldCm.Disconnect()
}

bnc.marketStream = conn
return nil
}

Expand Down Expand Up @@ -1922,6 +1924,8 @@ func (bnc *binance) UnsubscribeMarket(baseID, quoteID uint32) (err error) {
defer func() {
bnc.booksMtx.Unlock()

conn.UpdateURL(bnc.streamURL())

if closer != nil {
closer.Disconnect()
}
Expand Down
Loading