Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #574 from 0xProject/release/6.1.2-beta
Browse files Browse the repository at this point in the history
Release version 6.1.2-beta
  • Loading branch information
albrow authored Dec 4, 2019
2 parents 752ed0f + c1e0c22 commit 017d42d
Show file tree
Hide file tree
Showing 17 changed files with 693 additions and 400 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

This changelog is a work in progress and may contain notes for versions which have not actually been released. Check the [Releases](https://github.com/0xProject/0x-mesh/releases) page to see full release notes and more information about the latest released versions.

## v6.1.2-beta

### Bug fixes 🐞

- Fixed a bug which could cause Mesh to crash with a nil pointer exception if RPC requests are sent too quickly during/immediately after start up ([#560](https://github.com/0xProject/0x-mesh/pull/560)).


## v6.1.1-beta

### Bug fixes 🐞
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ the dropdown menu in the GitHub UI to select `development`.
## Prerequisites

- [GNU Make](https://www.gnu.org/software/make/) If you are using a Unix-like OS, you probably already have this.
- [Go version >= 1.12](https://golang.org/dl/) (or use [the version manager called "g"](https://github.com/stefanmaric/g))
- [Go version 1.12.x](https://golang.org/dl/) (or use [the version manager called "g"](https://github.com/stefanmaric/g)). Go 1.13 is not supported yet (see https://github.com/0xProject/0x-mesh/issues/480).
- [Dep package manager](https://golang.github.io/dep/docs/installation.html)
- [Node.js version >=11](https://nodejs.org/en/download/) (or use the [nvm version manager](https://github.com/creationix/nvm))
- [Yarn package manager](https://yarnpkg.com/en/)
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ test-go-parallel:

.PHONY: test-go-serial
test-go-serial:
go test ./zeroex/ordervalidator ./zeroex/orderwatch -race -timeout 30s -p=1 --serial
go test ./zeroex/ordervalidator ./zeroex/orderwatch ./core -race -timeout 30s -p=1 --serial


.PHONY: test-integration
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![Version](https://img.shields.io/badge/version-6.1.1--beta-orange.svg)](https://github.com/0xProject/0x-mesh/releases)
[![Version](https://img.shields.io/badge/version-6.1.2--beta-orange.svg)](https://github.com/0xProject/0x-mesh/releases)
[![Docs](https://img.shields.io/badge/docs-website-yellow.svg)](https://0x-org.gitbook.io/mesh)
[![Chat with us on Discord](https://img.shields.io/badge/chat-Discord-blueViolet.svg)](https://discord.gg/HF7fHwk)
[![Circle CI](https://img.shields.io/circleci/project/0xProject/0x-mesh/master.svg)](https://circleci.com/gh/0xProject/0x-mesh/tree/master)
Expand Down
2 changes: 1 addition & 1 deletion browser/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@0x/mesh-browser",
"version": "6.1.1-beta",
"version": "6.1.2-beta",
"description": "TypeScript and JavaScript bindings for running Mesh directly in the browser.",
"main": "./lib/index.js",
"license": "Apache-2.0",
Expand Down
10 changes: 5 additions & 5 deletions cmd/mesh/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (handler *rpcHandler) GetOrders(page, perPage int, snapshotID string) (resu
"error": internalErr,
"method": "GetOrders",
"stackTrace": string(debug.Stack()),
}).Printf("RPC method handler crashed")
}).Error("RPC method handler crashed")
err = errors.New("method handler crashed in GetOrders RPC call (check logs for stack trace)")
}
}()
Expand Down Expand Up @@ -110,7 +110,7 @@ func (handler *rpcHandler) AddOrders(signedOrdersRaw []*json.RawMessage, opts rp
"error": internalErr,
"method": "AddOrders",
"stackTrace": string(debug.Stack()),
}).Printf("RPC method handler crashed")
}).Error("RPC method handler crashed")
err = errors.New("method handler crashed in AddOrders RPC call (check logs for stack trace)")
}
}()
Expand Down Expand Up @@ -138,7 +138,7 @@ func (handler *rpcHandler) AddPeer(peerInfo peerstore.PeerInfo) (err error) {
"error": internalErr,
"method": "AddPeer",
"stackTrace": string(debug.Stack()),
}).Printf("RPC method handler crashed")
}).Error("RPC method handler crashed")
err = errors.New("method handler crashed in AddPeer RPC call (check logs for stack trace)")
}
}()
Expand All @@ -164,7 +164,7 @@ func (handler *rpcHandler) GetStats() (result *rpc.GetStatsResponse, err error)
"error": internalErr,
"method": "GetStats",
"stackTrace": string(debug.Stack()),
}).Printf("RPC method handler crashed")
}).Error("RPC method handler crashed")
err = errors.New("method handler crashed in GetStats RPC call (check logs for stack trace)")
}
}()
Expand All @@ -191,7 +191,7 @@ func (handler *rpcHandler) SubscribeToOrders(ctx context.Context) (result *ethrp
"error": internalErr,
"method": "SubscribeToOrders",
"stackTrace": string(debug.Stack()),
}).Printf("RPC method handler crashed")
}).Error("RPC method handler crashed")
err = errors.New("method handler crashed in SubscribeToOrders RPC call (check logs for stack trace)")
}
}()
Expand Down
43 changes: 37 additions & 6 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (
defaultNonPollingEthRPCRequestBuffer = 82720
// logStatsInterval is how often to log stats for this node.
logStatsInterval = 5 * time.Minute
version = "6.1.1-beta"
version = "6.1.2-beta"
)

// Note(albrow): The Config type is currently copied to browser/ts/index.ts. We
Expand Down Expand Up @@ -141,7 +141,6 @@ type App struct {
config Config
peerID peer.ID
privKey p2pcrypto.PrivKey
db *meshdb.MeshDB
node *p2p.Node
chainID int
blockWatcher *blockwatch.Watcher
Expand All @@ -152,9 +151,14 @@ type App struct {
snapshotExpirationWatcher *expirationwatch.Watcher
muIdToSnapshotInfo sync.Mutex
idToSnapshotInfo map[string]snapshotInfo
messageHandler *MessageHandler
ethRPCRateLimiter ratelimit.RateLimiter
ethRPCClient ethrpcclient.Client
orderSelector *orderSelector
db *meshdb.MeshDB

// started is closed to signal that the App has been started. Some methods
// will block until after the App is started.
started chan struct{}
}

func New(config Config) (*App, error) {
Expand Down Expand Up @@ -276,15 +280,16 @@ func New(config Config) (*App, error) {
if err != nil {
return nil, err
}
messageHandler := &MessageHandler{
orderSelector := &orderSelector{
nextOffset: 0,
db: meshDB,
}

app := &App{
started: make(chan struct{}),
config: config,
privKey: privKey,
peerID: peerID,
db: meshDB,
chainID: config.EthereumChainID,
blockWatcher: blockWatcher,
orderWatcher: orderWatcher,
Expand All @@ -293,9 +298,10 @@ func New(config Config) (*App, error) {
meshMessageJSONSchema: meshMessageJSONSchema,
snapshotExpirationWatcher: snapshotExpirationWatcher,
idToSnapshotInfo: map[string]snapshotInfo{},
messageHandler: messageHandler,
orderSelector: orderSelector,
ethRPCRateLimiter: ethRPCRateLimiter,
ethRPCClient: ethClient,
db: meshDB,
}

log.WithFields(map[string]interface{}{
Expand Down Expand Up @@ -445,6 +451,12 @@ func (app *App) Start(ctx context.Context) error {
}

// Initialize the p2p node.
// Note(albrow): The main reason that we need to use a `started` channel in
// some methods is that we cannot call p2p.New without passing in a context
// (due to how libp2p works). This means that before app.Start is called,
// app.node will be nil and attempting to call any methods on app.node will
// panic with a nil pointer exception. All the other fields of core.App that
// we need to use will have already been initialized and are ready to use.
bootstrapList := p2p.DefaultBootstrapList
if app.config.BootstrapList != "" {
bootstrapList = strings.Split(app.config.BootstrapList, ",")
Expand Down Expand Up @@ -492,6 +504,10 @@ func (app *App) Start(ctx context.Context) error {
app.periodicallyLogStats(innerCtx)
}()

// Signal that the app has been started.
log.Info("core.App was started")
close(app.started)

// If any error channel returns a non-nil error, we cancel the inner context
// and return the error. Note that this means we only return the first error
// that occurs.
Expand Down Expand Up @@ -529,6 +545,8 @@ func (app *App) Start(ctx context.Context) error {
}

func (app *App) periodicallyCheckForNewAddrs(ctx context.Context, startingAddrs []ma.Multiaddr) {
<-app.started

// TODO(albrow): There might be a more efficient way to do this if we have access to
// an event bus. See: https://github.com/libp2p/go-libp2p/issues/467
seenAddrs := stringset.New()
Expand Down Expand Up @@ -569,6 +587,8 @@ func (e ErrSnapshotNotFound) Error() string {
// continue to make requests supplying the `snapshotID` returned from the first request. After 1 minute of not
// received further requests referencing a specific snapshot, the snapshot expires and can no longer be used.
func (app *App) GetOrders(page, perPage int, snapshotID string) (*rpc.GetOrdersResponse, error) {
<-app.started

ordersInfos := []*rpc.OrderInfo{}
if perPage <= 0 {
return &rpc.GetOrdersResponse{
Expand Down Expand Up @@ -642,6 +662,8 @@ func (app *App) GetOrders(page, perPage int, snapshotID string) (*rpc.GetOrdersR
// they will only be removed if they become unfillable and will not be removed
// due to having a high expiration time or any incentive mechanisms.
func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage, pinned bool) (*ordervalidator.ValidationResults, error) {
<-app.started

allValidationResults := &ordervalidator.ValidationResults{
Accepted: []*ordervalidator.AcceptedOrderInfo{},
Rejected: []*ordervalidator.RejectedOrderInfo{},
Expand Down Expand Up @@ -751,6 +773,8 @@ func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage, pinned bool) (*ord

// shareOrder immediately shares the given order on the GossipSub network.
func (app *App) shareOrder(order *zeroex.SignedOrder) error {
<-app.started

encoded, err := encodeOrder(order)
if err != nil {
return err
Expand All @@ -760,11 +784,15 @@ func (app *App) shareOrder(order *zeroex.SignedOrder) error {

// AddPeer can be used to manually connect to a new peer.
func (app *App) AddPeer(peerInfo peerstore.PeerInfo) error {
<-app.started

return app.node.Connect(peerInfo, peerConnectTimeout)
}

// GetStats retrieves stats about the Mesh node
func (app *App) GetStats() (*rpc.GetStatsResponse, error) {
<-app.started

latestBlockHeader, err := app.blockWatcher.GetLatestBlockProcessed()
if err != nil {
return nil, err
Expand Down Expand Up @@ -814,6 +842,8 @@ func (app *App) GetStats() (*rpc.GetStatsResponse, error) {
}

func (app *App) periodicallyLogStats(ctx context.Context) {
<-app.started

ticker := time.NewTicker(logStatsInterval)
for {
select {
Expand Down Expand Up @@ -848,6 +878,7 @@ func (app *App) periodicallyLogStats(ctx context.Context) {

// SubscribeToOrderEvents let's one subscribe to order events emitted by the OrderWatcher
func (app *App) SubscribeToOrderEvents(sink chan<- []*zeroex.OrderEvent) event.Subscription {
// app.orderWatcher is guaranteed to be initialized. No need to wait.
subscription := app.orderWatcher.Subscribe(sink)
return subscription
}
Expand Down
23 changes: 14 additions & 9 deletions core/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
// Ensure that App implements p2p.MessageHandler.
var _ p2p.MessageHandler = &App{}

type MessageHandler struct {
type orderSelector struct {
nextOffset int
db *meshdb.MeshDB
}

func min(a int, b int) int {
Expand All @@ -25,16 +26,20 @@ func min(a int, b int) int {
}

func (app *App) GetMessagesToShare(max int) ([][]byte, error) {
return app.orderSelector.GetMessagesToShare(max)
}

func (orderSelector *orderSelector) GetMessagesToShare(max int) ([][]byte, error) {
// For now, we use a round robin strategy to select a set of orders to share.
// We might return less than max even if there are max or greater orders
// currently stored.
// Use a snapshot to make sure state doesn't change between our two queries.
ordersSnapshot, err := app.db.Orders.GetSnapshot()
ordersSnapshot, err := orderSelector.db.Orders.GetSnapshot()
if err != nil {
return nil, err
}
defer ordersSnapshot.Release()
notRemovedFilter := app.db.Orders.IsRemovedIndex.ValueFilter([]byte{0})
notRemovedFilter := orderSelector.db.Orders.IsRemovedIndex.ValueFilter([]byte{0})
count, err := ordersSnapshot.NewQuery(notRemovedFilter).Count()
if err != nil {
return nil, err
Expand All @@ -45,9 +50,9 @@ func (app *App) GetMessagesToShare(max int) ([][]byte, error) {

// Select up to the maximum number of orders starting at the offset that was
// calculated the last time this was called with `app`.
offset := min(app.messageHandler.nextOffset, count)
offset := min(orderSelector.nextOffset, count)
var selectedOrders []*meshdb.Order
if offset != count {
if offset < count {
err = ordersSnapshot.NewQuery(notRemovedFilter).Offset(offset).Max(max).Run(&selectedOrders)
if err != nil {
return nil, err
Expand All @@ -65,13 +70,13 @@ func (app *App) GetMessagesToShare(max int) ([][]byte, error) {
return nil, err
}
selectedOrders = append(selectedOrders, overflowSelectedOrders...)
app.messageHandler.nextOffset = overflow
orderSelector.nextOffset = overflow
} else {
// Calculate the next offset and wrap back to 0 if the next offset is larger
// than or equal to count.
app.messageHandler.nextOffset += max
if app.messageHandler.nextOffset >= count {
app.messageHandler.nextOffset = 0
orderSelector.nextOffset += max
if orderSelector.nextOffset >= count {
orderSelector.nextOffset = 0
}
}

Expand Down
Loading

0 comments on commit 017d42d

Please sign in to comment.