Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #553 from 0xProject/revert-552-fix/always-unsubscr…
Browse files Browse the repository at this point in the history
…ibe-order-feed

Revert "Always unsubscribe from order events in SetupOrderStream"
  • Loading branch information
albrow authored Nov 21, 2019
2 parents 3234099 + fd6c7e6 commit 9ea8cf7
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 13 deletions.
7 changes: 0 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@

This changelog is a work in progress and may contain notes for versions which have not actually been released. Check the [Releases](https://github.com/0xProject/0x-mesh/releases) page to see full release notes and more information about the latest released versions.

## v6.1.1-beta

### Bug fixes 🐞

- Fixed a bug where the internal order event feed could become blocked, rendering Mesh unable to receive any new orders or update existing ones ([#552](https://github.com/0xProject/0x-mesh/pull/552)).


## v6.1.0-beta

### Features ✅
Expand Down
10 changes: 4 additions & 6 deletions cmd/mesh/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
log "github.com/sirupsen/logrus"
)

// orderEventsBufferSize is the buffer size for the orderEvents channel. If
// the buffer is full, any additional events won't be processed.
const orderEventsBufferSize = 8000

type rpcHandler struct {
app *core.App
}
Expand Down Expand Up @@ -213,9 +209,8 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription,
rpcSub := notifier.CreateSubscription()

go func() {
orderEventsChan := make(chan []*zeroex.OrderEvent, orderEventsBufferSize)
orderEventsChan := make(chan []*zeroex.OrderEvent)
orderWatcherSub := app.SubscribeToOrderEvents(orderEventsChan)
defer orderWatcherSub.Unsubscribe()

for {
select {
Expand All @@ -242,6 +237,7 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription,
// error.
if _, ok := err.(*net.OpError); ok {
logEntry.Trace(message)
orderWatcherSub.Unsubscribe()
return
}
if strings.Contains(err.Error(), "write: broken pipe") {
Expand All @@ -253,11 +249,13 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription,
case err := <-rpcSub.Err():
if err != nil {
log.WithField("err", err).Error("rpcSub returned an error")
orderWatcherSub.Unsubscribe()
} else {
log.Debug("rpcSub was closed without error")
}
return
case <-notifier.Closed():
orderWatcherSub.Unsubscribe()
return
}
}
Expand Down
1 change: 1 addition & 0 deletions rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func SetupHeartbeat(ctx context.Context) (*ethrpc.Subscription, error) {

// Wait MinCleanupInterval before emitting the next heartbeat.
time.Sleep(minHeartbeatInterval - time.Since(start))

}
}()

Expand Down

0 comments on commit 9ea8cf7

Please sign in to comment.