Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
release: 11.1.0 (#1063)
Browse files Browse the repository at this point in the history
* init v4 GossipSub

- setup v4 orderfilter
- enable v4 messageHandler
- add configuration for NodeV4

* use proper topic for message_hander_v4

* add prometheus monitoring

* disable cache for apollo

* add PeersConnected and LatestBlock metrics, fix v4 logs

Include `_v4` in ordersync_v4.go logs for easier identification.

Also:
- lower logStatsInterval for better prom metrics resolution
- bump mesh version to 11.1.0

* update deployment docs

* standardize ordersync_v4 logs

* cut release 11.1.0

* lint

- remove unused function
  • Loading branch information
opaolini authored Apr 6, 2021
1 parent a3a7d73 commit dc24254
Show file tree
Hide file tree
Showing 33 changed files with 997 additions and 852 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![Version](https://img.shields.io/badge/version-11.0.3-orange.svg)](https://github.com/0xProject/0x-mesh/releases)
[![Version](https://img.shields.io/badge/version-11.1.0-orange.svg)](https://github.com/0xProject/0x-mesh/releases)
[![Docs](https://img.shields.io/badge/docs-website-yellow.svg)](https://0x-org.gitbook.io/mesh)
[![Chat with us on Discord](https://img.shields.io/badge/chat-Discord-blueViolet.svg)](https://discord.gg/HF7fHwk)
[![Circle CI](https://img.shields.io/circleci/project/0xProject/0x-mesh/master.svg)](https://circleci.com/gh/0xProject/0x-mesh/tree/master)
Expand Down
20 changes: 20 additions & 0 deletions cmd/mesh/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/0xProject/0x-mesh/core"
"github.com/0xProject/0x-mesh/metrics"
"github.com/plaid/go-envvar/envvar"
log "github.com/sirupsen/logrus"
)
Expand All @@ -38,6 +39,13 @@ type standaloneConfig struct {
// See https://github.com/graphql/graphiql for more information. By default, GraphiQL
// is disabled.
EnableGraphQLPlayground bool `envvar:"ENABLE_GRAPHQL_PLAYGROUND" default:"false"`
// EnablePrometheusMoniitoring determines whether or not to enable
// prometheus monitoring. The metrics are accessed by scraping
// {PrometheusMonitoringServerAddr}/metrics, prometheus is disabled.
EnablePrometheusMonitoring bool `envvar:"ENABLE_PROMETHEUS_MONITORING" default:"false"`
// PrometheusMonitoringServerAddr is the interface and port to use for
// prometheus server metrics endpoint.
PrometheusMonitoringServerAddr string `envvar:"PROMETHEUS_SERVER_ADDR" default:"0.0.0.0:8080"`
}

func main() {
Expand Down Expand Up @@ -86,6 +94,18 @@ func main() {
}()
}

// NOTE: Prometehus is not an essential service to run.
if config.EnablePrometheusMonitoring {
wg.Add(1)
go func() {
defer wg.Done()
log.WithField("prometheus_server_addr", config.PrometheusMonitoringServerAddr).Info("starting Prometheus metrics server")
if err := metrics.ServeMetrics(ctx, config.PrometheusMonitoringServerAddr); err != nil {
log.Error(err)
}
}()
}

// Block until there is an error or the app is closed.
select {
case <-ctx.Done():
Expand Down
43 changes: 39 additions & 4 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/0xProject/0x-mesh/ethereum/ratelimit"
"github.com/0xProject/0x-mesh/keys"
"github.com/0xProject/0x-mesh/loghooks"
"github.com/0xProject/0x-mesh/metrics"
"github.com/0xProject/0x-mesh/orderfilter"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/zeroex"
Expand Down Expand Up @@ -56,8 +57,8 @@ const (
// for different Ethereum networks, but it should be good enough.
estimatedNonPollingEthereumRPCRequestsPer24Hrs = 50000
// logStatsInterval is how often to log stats for this node.
logStatsInterval = 5 * time.Minute
version = "11.0.3"
logStatsInterval = 2 * time.Minute
version = "11.1.0"
// ordersyncMinPeers is the minimum amount of peers to receive orders from
// before considering the ordersync process finished.
ordersyncMinPeers = 5
Expand Down Expand Up @@ -434,6 +435,26 @@ func getPublishTopics(chainID int, contractAddresses ethereum.ContractAddresses,
}
}

func getPublishTopicsV4(chainID int, contractAddresses ethereum.ContractAddresses, customFilter *orderfilter.Filter) ([]string, error) {
defaultTopic, err := orderfilter.GetDefaultTopicV4(chainID, contractAddresses)
if err != nil {
return nil, err
}
customTopic := customFilter.TopicV4()
if defaultTopic == customTopic {
// If we're just using the default order filter, we don't need to publish to
// multiple topics.
return []string{defaultTopic}, nil
} else {
// If we are using a custom order filter, publish to *both* the default
// topic and the custom topic. All orders that match the custom order filter
// must necessarily match the default filter. This also allows us to
// implement cross-topic forwarding in the future.
// See https://github.com/0xProject/0x-mesh/pull/563
return []string{defaultTopic, customTopic}, nil
}
}

func (app *App) getRendezvousPoints() ([]string, error) {
defaultRendezvousPoint := fmt.Sprintf("/0x-mesh/network/%d/version/2", app.config.EthereumChainID)
defaultTopic, err := orderfilter.GetDefaultTopic(app.chainID, *app.contractAddresses)
Expand Down Expand Up @@ -499,6 +520,12 @@ func (app *App) Start() error {
return err
}

// Get the publish topics depending on our custom order filter.
publishTopicsV4, err := getPublishTopicsV4(app.config.EthereumChainID, *app.contractAddresses, app.orderFilter)
if err != nil {
return err
}

// Create a child context so that we can preemptively cancel if there is an
// error.
innerCtx, cancel := context.WithCancel(app.ctx)
Expand Down Expand Up @@ -622,7 +649,9 @@ func (app *App) Start() error {
}
nodeConfig := p2p.Config{
SubscribeTopic: app.orderFilter.Topic(),
SubscribeTopicV4: app.orderFilter.TopicV4(),
PublishTopics: publishTopics,
PublishTopicsV4: publishTopicsV4,
TCPPort: app.config.P2PTCPPort,
WebSocketsPort: app.config.P2PWebSocketsPort,
Insecure: false,
Expand Down Expand Up @@ -929,7 +958,6 @@ func (app *App) AddOrders(ctx context.Context, signedOrders []*zeroex.SignedOrde
return app.AddOrdersRaw(ctx, signedOrdersRaw, pinned, opts)
}

// TODO(oskar) - finish
func (app *App) AddOrdersV4(ctx context.Context, signedOrders []*zeroex.SignedOrderV4, pinned bool, opts *types.AddOrdersOpts) (*ordervalidator.ValidationResults, error) {
signedOrdersRaw := []*json.RawMessage{}
buf := &bytes.Buffer{}
Expand Down Expand Up @@ -1131,6 +1159,7 @@ func (app *App) AddOrdersRawV4(ctx context.Context, signedOrdersRaw []*json.RawM

// shareOrder immediately shares the given order on the GossipSub network.
func (app *App) shareOrder(order *zeroex.SignedOrder) error {
defer metrics.OrdersShared.WithLabelValues(metrics.ProtocolV3).Inc()
<-app.started

encoded, err := encoding.OrderToRawMessage(app.orderFilter.Topic(), order)
Expand All @@ -1142,13 +1171,14 @@ func (app *App) shareOrder(order *zeroex.SignedOrder) error {

// shareOrderV4 immediately shares the given order on the GossipSub network.
func (app *App) shareOrderV4(order *zeroex.SignedOrderV4) error {
defer metrics.OrdersShared.WithLabelValues(metrics.ProtocolV4).Inc()
<-app.started

encoded, err := json.Marshal(order)
if err != nil {
return err
}
return app.node.Send(encoded)
return app.node.SendV4(encoded)
}

// AddPeer can be used to manually connect to a new peer.
Expand Down Expand Up @@ -1285,6 +1315,8 @@ func (app *App) periodicallyLogStats(ctx context.Context) {
log.WithError(err).Error("could not get stats")
continue
}
metrics.PeersConnected.Set(float64(stats.NumPeers))
metrics.LatestBlock.Set(float64(stats.LatestBlock.Number.Int64()))
log.WithFields(log.Fields{
"version": stats.Version,
"pubSubTopic": stats.PubSubTopic,
Expand All @@ -1294,6 +1326,9 @@ func (app *App) periodicallyLogStats(ctx context.Context) {
"numOrders": stats.NumOrders,
"numOrdersIncludingRemoved": stats.NumOrdersIncludingRemoved,
"numPinnedOrders": stats.NumPinnedOrders,
"numOrdersV4": stats.NumOrdersV4,
"numOrdersIncludingRemovedV4": stats.NumOrdersIncludingRemovedV4,
"numPinnedOrdersV4": stats.NumPinnedOrdersV4,
"numPeers": stats.NumPeers,
"maxExpirationTime": stats.MaxExpirationTime,
"startOfCurrentUTCDay": stats.StartOfCurrentUTCDay,
Expand Down
9 changes: 9 additions & 0 deletions core/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/0xProject/0x-mesh/common/types"
"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/encoding"
"github.com/0xProject/0x-mesh/metrics"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/0xProject/0x-mesh/zeroex/ordervalidator"
Expand Down Expand Up @@ -103,6 +104,14 @@ func (app *App) HandleMessages(ctx context.Context, messages []*p2p.Message) err
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
}
}

metrics.P2POrdersReceived.
WithLabelValues(metrics.ProtocolV3, metrics.ValidationAccepted).
Add(float64(len(validationResults.Accepted)))

metrics.P2POrdersReceived.
WithLabelValues(metrics.ProtocolV3, metrics.ValidationRejected).
Add(float64(len(validationResults.Rejected)))
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions core/message_handler_v4.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"

"github.com/0xProject/0x-mesh/common/types"
"github.com/0xProject/0x-mesh/metrics"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/0xProject/0x-mesh/zeroex/ordervalidator"
Expand Down Expand Up @@ -90,5 +91,14 @@ func (app *App) HandleMessagesV4(ctx context.Context, messages []*p2p.Message) e
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
}
}

metrics.P2POrdersReceived.
WithLabelValues(metrics.ProtocolV4, metrics.ValidationAccepted).
Add(float64(len(validationResults.Accepted)))

metrics.P2POrdersReceived.
WithLabelValues(metrics.ProtocolV4, metrics.ValidationRejected).
Add(float64(len(validationResults.Rejected)))

return nil
}
5 changes: 4 additions & 1 deletion core/ordersync/ordersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"
"time"

"github.com/0xProject/0x-mesh/metrics"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/albrow/stringset"
Expand Down Expand Up @@ -225,6 +226,7 @@ func (s *Service) HandleStream(stream network.Stream) {
log.WithFields(log.Fields{
"requester": stream.Conn().RemotePeer().Pretty(),
}).Trace("received ordersync request")
metrics.OrdersyncRequestsReceived.WithLabelValues(metrics.ProtocolV3).Inc()
rawRes := s.handleRawRequest(rawReq, requesterID)
if rawRes == nil {
return
Expand Down Expand Up @@ -311,7 +313,6 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
log.WithFields(log.Fields{
"provider": peerID.Pretty(),
}).Trace("requesting orders from neighbor via ordersync")

wg.Add(1)
go func(id peer.ID) {
defer func() {
Expand All @@ -323,6 +324,7 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
"error": err.Error(),
"provider": id.Pretty(),
}).Debug("could not get orders from peer via ordersync")
metrics.OrdersyncRequestsSent.WithLabelValues(metrics.ProtocolV3, metrics.OrdersyncSuccess).Inc()
m.Lock()
if nextFirstRequest != nil {
nextRequestForPeer[id] = nextFirstRequest
Expand All @@ -332,6 +334,7 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
log.WithFields(log.Fields{
"provider": id.Pretty(),
}).Trace("successfully got orders from peer via ordersync")
metrics.OrdersyncRequestsSent.WithLabelValues(metrics.ProtocolV3, metrics.OrdersyncFailure).Inc()
m.Lock()
successfullySyncedPeers.Add(id.Pretty())
delete(nextRequestForPeer, id)
Expand Down
34 changes: 19 additions & 15 deletions core/ordersync_v4/ordersync_v4.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/0xProject/0x-mesh/common/types"
"github.com/0xProject/0x-mesh/db"
"github.com/0xProject/0x-mesh/metrics"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/0xProject/0x-mesh/zeroex/orderwatch"
Expand Down Expand Up @@ -109,13 +110,13 @@ func (s *Service) HandleStream(stream network.Stream) {
// Pre-emptively close the stream if we can't accept anymore requests.
log.WithFields(log.Fields{
"requester": stream.Conn().RemotePeer().Pretty(),
}).Warn("closing ordersync stream because rate limiter is backed up")
}).Warn("closing ordersync_v4 stream because rate limiter is backed up")
_ = stream.Reset()
return
}
log.WithFields(log.Fields{
"requester": stream.Conn().RemotePeer().Pretty(),
}).Trace("handling ordersync stream")
}).Trace("handling ordersync_v4 stream")
defer func() {
_ = stream.Close()
}()
Expand All @@ -125,7 +126,7 @@ func (s *Service) HandleStream(stream network.Stream) {
if err := s.requestRateLimiter.Wait(s.ctx); err != nil {
log.WithFields(log.Fields{
"requester": stream.Conn().RemotePeer().Pretty(),
}).Warn("ordersync rate limiter returned error")
}).Warn("ordersync_v4 rate limiter returned error")
return
}
request, err := waitForRequest(s.ctx, stream)
Expand All @@ -135,7 +136,8 @@ func (s *Service) HandleStream(stream network.Stream) {
}
log.WithFields(log.Fields{
"requester": stream.Conn().RemotePeer().Pretty(),
}).Trace("received ordersync V4 request")
}).Trace("received ordersync_v4 request")
metrics.OrdersyncRequestsReceived.WithLabelValues(metrics.ProtocolV4).Inc()
response := s.handleRequest(request, requesterID)
if response == nil {
return
Expand All @@ -144,7 +146,7 @@ func (s *Service) HandleStream(stream network.Stream) {
log.WithFields(log.Fields{
"error": err.Error(),
"requester": requesterID.Pretty(),
}).Warn("could not encode ordersync V4 response")
}).Warn("could not encode ordersync_v4 response")
s.handlePeerScoreEvent(requesterID, psUnexpectedDisconnect)
return
}
Expand Down Expand Up @@ -223,7 +225,7 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {

log.WithFields(log.Fields{
"provider": peerID.Pretty(),
}).Trace("requesting orders from neighbor via ordersync")
}).Trace("requesting orders from neighbor via ordersync_v4")

wg.Add(1)
go func(id peer.ID) {
Expand All @@ -235,7 +237,8 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
log.WithFields(log.Fields{
"error": err.Error(),
"provider": id.Pretty(),
}).Debug("could not get orders from peer via ordersync")
}).Debug("could not get orders from peer via ordersync_v4")
metrics.OrdersyncRequestsSent.WithLabelValues(metrics.ProtocolV4, metrics.OrdersyncFailure).Inc()
m.Lock()
if nextFirstRequest != nil {
nextRequestForPeer[id] = nextFirstRequest
Expand All @@ -244,7 +247,8 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
} else {
log.WithFields(log.Fields{
"provider": id.Pretty(),
}).Trace("successfully got orders from peer via ordersync")
}).Trace("successfully got orders from peer via ordersync_v4")
metrics.OrdersyncRequestsSent.WithLabelValues(metrics.ProtocolV4, metrics.OrdersyncSuccess).Inc()
m.Lock()
successfullySyncedPeers.Add(id.Pretty())
delete(nextRequestForPeer, id)
Expand All @@ -266,7 +270,7 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
"delayBeforeNextRetry": delayBeforeNextRetry.String(),
"minPeers": minPeers,
"successfullySyncedPeers": successfullySyncedPeerLength,
}).Debug("ordersync could not get orders from enough peers (trying again soon)")
}).Debug("ordersync_v4 could not get orders from enough peers (trying again soon)")
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -278,7 +282,7 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
log.WithFields(log.Fields{
"minPeers": minPeers,
"successfullySyncedPeers": len(successfullySyncedPeers),
}).Info("completed a round of ordersync")
}).Info("completed a round of ordersync_v4")
return nil
}

Expand Down Expand Up @@ -332,7 +336,7 @@ func waitForRequest(parentCtx context.Context, stream network.Stream) (*Request,
log.WithFields(log.Fields{
"error": err.Error(),
"requester": stream.Conn().RemotePeer().Pretty(),
}).Warn("could not decode ordersync request")
}).Warn("could not decode ordersync_v4 request")
errChan <- err
return
}
Expand All @@ -344,7 +348,7 @@ func waitForRequest(parentCtx context.Context, stream network.Stream) (*Request,
log.WithFields(log.Fields{
"error": ctx.Err(),
"requester": stream.Conn().RemotePeer().Pretty(),
}).Warn("timed out waiting for ordersync request")
}).Warn("timed out waiting for ordersync_v4 request")
return nil, ctx.Err()
case err := <-errChan:
return nil, err
Expand All @@ -364,7 +368,7 @@ func waitForResponse(parentCtx context.Context, stream network.Stream) (*Respons
log.WithFields(log.Fields{
"error": err.Error(),
"provider": stream.Conn().RemotePeer().Pretty(),
}).Warn("could not decode ordersync response")
}).Warn("could not decode ordersync_v4 response")
errChan <- err
return
}
Expand All @@ -389,7 +393,7 @@ func (s *Service) handleRequest(request *Request, requesterID peer.ID) *Response
// Early exit if channel closed?
select {
case <-s.ctx.Done():
log.WithError(s.ctx.Err()).Warn("handleRequest v4 error")
log.WithError(s.ctx.Err()).Warn("handleRequest ordersync_v4 error")
return nil
default:
}
Expand Down Expand Up @@ -417,7 +421,7 @@ func (s *Service) handleRequest(request *Request, requesterID peer.ID) *Response
Limit: uint(s.perPage),
})
if err != nil {
log.WithError(err).Warn("handleRequest v4 error")
log.WithError(err).Warn("handleRequest ordersync_v4 error")
return nil
}
var orders []*zeroex.SignedOrderV4
Expand Down
Loading

0 comments on commit dc24254

Please sign in to comment.