Skip to content
This repository was archived by the owner on Oct 11, 2024. It is now read-only.

Commit 3234099

Browse files
authored
Merge pull request #552 from 0xProject/fix/always-unsubscribe-order-feed
Always unsubscribe from order events in SetupOrderStream
2 parents 2a74919 + 7c7f433 commit 3234099

File tree

3 files changed

+13
-5
lines changed

3 files changed

+13
-5
lines changed

CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22

33
This changelog is a work in progress and may contain notes for versions which have not actually been released. Check the [Releases](https://github.com/0xProject/0x-mesh/releases) page to see full release notes and more information about the latest released versions.
44

5+
## v6.1.1-beta
6+
7+
### Bug fixes 🐞
8+
9+
- Fixed a bug where the internal order event feed could become blocked, rendering Mesh unable to receive any new orders or update existing ones ([#552](https://github.com/0xProject/0x-mesh/pull/552)).
10+
11+
512
## v6.1.0-beta
613

714
### Features ✅

cmd/mesh/rpc_handler.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ import (
2222
log "github.com/sirupsen/logrus"
2323
)
2424

25+
// orderEventsBufferSize is the buffer size for the orderEvents channel. If
26+
// the buffer is full, any additional events won't be processed.
27+
const orderEventsBufferSize = 8000
28+
2529
type rpcHandler struct {
2630
app *core.App
2731
}
@@ -209,8 +213,9 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription,
209213
rpcSub := notifier.CreateSubscription()
210214

211215
go func() {
212-
orderEventsChan := make(chan []*zeroex.OrderEvent)
216+
orderEventsChan := make(chan []*zeroex.OrderEvent, orderEventsBufferSize)
213217
orderWatcherSub := app.SubscribeToOrderEvents(orderEventsChan)
218+
defer orderWatcherSub.Unsubscribe()
214219

215220
for {
216221
select {
@@ -237,7 +242,6 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription,
237242
// error.
238243
if _, ok := err.(*net.OpError); ok {
239244
logEntry.Trace(message)
240-
orderWatcherSub.Unsubscribe()
241245
return
242246
}
243247
if strings.Contains(err.Error(), "write: broken pipe") {
@@ -249,13 +253,11 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription,
249253
case err := <-rpcSub.Err():
250254
if err != nil {
251255
log.WithField("err", err).Error("rpcSub returned an error")
252-
orderWatcherSub.Unsubscribe()
253256
} else {
254257
log.Debug("rpcSub was closed without error")
255258
}
256259
return
257260
case <-notifier.Closed():
258-
orderWatcherSub.Unsubscribe()
259261
return
260262
}
261263
}

rpc/service.go

-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ func SetupHeartbeat(ctx context.Context) (*ethrpc.Subscription, error) {
114114

115115
// Wait MinCleanupInterval before emitting the next heartbeat.
116116
time.Sleep(minHeartbeatInterval - time.Since(start))
117-
118117
}
119118
}()
120119

0 commit comments

Comments
 (0)