Skip to content
This repository was archived by the owner on Oct 11, 2024. It is now read-only.

Commit fd6c7e6

Browse files
authored
Revert "Always unsubscribe from order events in SetupOrderStream"
1 parent 3234099 commit fd6c7e6

File tree

3 files changed

+5
-13
lines changed

3 files changed

+5
-13
lines changed

CHANGELOG.md

-7
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,6 @@
22

33
This changelog is a work in progress and may contain notes for versions which have not actually been released. Check the [Releases](https://github.com/0xProject/0x-mesh/releases) page to see full release notes and more information about the latest released versions.
44

5-
## v6.1.1-beta
6-
7-
### Bug fixes 🐞
8-
9-
- Fixed a bug where the internal order event feed could become blocked, rendering Mesh unable to receive any new orders or update existing ones ([#552](https://github.com/0xProject/0x-mesh/pull/552)).
10-
11-
125
## v6.1.0-beta
136

147
### Features ✅

cmd/mesh/rpc_handler.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@ import (
2222
log "github.com/sirupsen/logrus"
2323
)
2424

25-
// orderEventsBufferSize is the buffer size for the orderEvents channel. If
26-
// the buffer is full, any additional events won't be processed.
27-
const orderEventsBufferSize = 8000
28-
2925
type rpcHandler struct {
3026
app *core.App
3127
}
@@ -213,9 +209,8 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription,
213209
rpcSub := notifier.CreateSubscription()
214210

215211
go func() {
216-
orderEventsChan := make(chan []*zeroex.OrderEvent, orderEventsBufferSize)
212+
orderEventsChan := make(chan []*zeroex.OrderEvent)
217213
orderWatcherSub := app.SubscribeToOrderEvents(orderEventsChan)
218-
defer orderWatcherSub.Unsubscribe()
219214

220215
for {
221216
select {
@@ -242,6 +237,7 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription,
242237
// error.
243238
if _, ok := err.(*net.OpError); ok {
244239
logEntry.Trace(message)
240+
orderWatcherSub.Unsubscribe()
245241
return
246242
}
247243
if strings.Contains(err.Error(), "write: broken pipe") {
@@ -253,11 +249,13 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription,
253249
case err := <-rpcSub.Err():
254250
if err != nil {
255251
log.WithField("err", err).Error("rpcSub returned an error")
252+
orderWatcherSub.Unsubscribe()
256253
} else {
257254
log.Debug("rpcSub was closed without error")
258255
}
259256
return
260257
case <-notifier.Closed():
258+
orderWatcherSub.Unsubscribe()
261259
return
262260
}
263261
}

rpc/service.go

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ func SetupHeartbeat(ctx context.Context) (*ethrpc.Subscription, error) {
114114

115115
// Wait MinCleanupInterval before emitting the next heartbeat.
116116
time.Sleep(minHeartbeatInterval - time.Since(start))
117+
117118
}
118119
}()
119120

0 commit comments

Comments
 (0)