Skip to content

Commit

Permalink
p2p: Add algod_network_p2p_* traffic metrics (#6105)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Aug 29, 2024
1 parent 9d5c4cd commit f6c59a4
Show file tree
Hide file tree
Showing 15 changed files with 466 additions and 214 deletions.
2 changes: 2 additions & 0 deletions cmd/algod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ var startupConfigCheckFields = []string{
"TxPoolExponentialIncreaseFactor",
"TxPoolSize",
"VerifiedTranscationsCacheSize",
"EnableP2P",
"EnableP2PHybridMode",
}

func resolveDataDir() string {
Expand Down
206 changes: 206 additions & 0 deletions network/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package network

import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
p2proto "github.com/libp2p/go-libp2p/core/protocol"

"github.com/algorand/go-algorand/network/p2p"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/metrics"
)

func init() {
// all tags are tracked by ws net
tagStringList := make([]string, 0, len(protocol.TagList))
for _, t := range protocol.TagList {
tagStringList = append(tagStringList, string(t))
}
networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK")
networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK")
networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK")
networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK")
networkHandleCountByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_countbytag_{TAG}", "count of handler calls in the receive thread for {TAG} messages", tagStringList, "UNK")
networkHandleMicrosByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_microsbytag_{TAG}", "microseconds spent by protocol handlers in the receive thread for {TAG} messages", tagStringList, "UNK")

networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK")
networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK")
networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK")
networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK")
}

var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal)
var networkP2PSentBytesTotal = metrics.MakeCounter(metrics.NetworkP2PSentBytesTotal)
var networkSentBytesByTag *metrics.TagCounter
var networkP2PSentBytesByTag *metrics.TagCounter
var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal)
var networkP2PReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkP2PReceivedBytesTotal)
var networkReceivedBytesByTag *metrics.TagCounter
var networkP2PReceivedBytesByTag *metrics.TagCounter

var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal)
var networkP2PMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkP2PMessageReceivedTotal)
var networkMessageReceivedByTag *metrics.TagCounter
var networkP2PMessageReceivedByTag *metrics.TagCounter
var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal)
var networkP2PMessageSentTotal = metrics.MakeCounter(metrics.NetworkP2PMessageSentTotal)
var networkMessageSentByTag *metrics.TagCounter
var networkP2PMessageSentByTag *metrics.TagCounter

var networkHandleMicrosByTag *metrics.TagCounter
var networkHandleCountByTag *metrics.TagCounter

var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal)
var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"})
var networkP2PMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_message_sent_queue_micros_total", Description: "Total microseconds p2p message spent waiting in queue to be sent"})

var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal)
var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal)
var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal)
var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal)
var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal)
var unknownProtocolTagMessagesTotal = metrics.MakeCounter(metrics.UnknownProtocolTagMessagesTotal)

var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections)
var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections)

var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_buffer_micros_total", Description: "microseconds spent by incoming messages on the receive buffer"})
var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"})

var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"})
var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"})
var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"})
var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"})
var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"})
var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"})

var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"})
var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"})
var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"})

var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"})
var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"})

var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", Description: "Number of active peers."})
var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."})
var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."})

var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description)
var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage)
var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage)
var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage)

var networkP2PGossipSubSentBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_sent_bytes_total", Description: "Total number of bytes sent through gossipsub"})
var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_received_bytes_total", Description: "Total number of bytes received through gossipsub"})

var _ = pubsub.RawTracer(pubsubMetricsTracer{})

// pubsubMetricsTracer is a tracer for pubsub events used to track metrics.
type pubsubMetricsTracer struct{}

// AddPeer is invoked when a new peer is added.
func (t pubsubMetricsTracer) AddPeer(p peer.ID, proto p2proto.ID) {}

// RemovePeer is invoked when a peer is removed.
func (t pubsubMetricsTracer) RemovePeer(p peer.ID) {}

// Join is invoked when a new topic is joined
func (t pubsubMetricsTracer) Join(topic string) {}

// Leave is invoked when a topic is abandoned
func (t pubsubMetricsTracer) Leave(topic string) {}

// Graft is invoked when a new peer is grafted on the mesh (gossipsub)
func (t pubsubMetricsTracer) Graft(p peer.ID, topic string) {}

// Prune is invoked when a peer is pruned from the message (gossipsub)
func (t pubsubMetricsTracer) Prune(p peer.ID, topic string) {}

// ValidateMessage is invoked when a message first enters the validation pipeline.
func (t pubsubMetricsTracer) ValidateMessage(msg *pubsub.Message) {
if msg != nil && msg.Topic != nil {
switch *msg.Topic {
case p2p.TXTopicName:
networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)), nil)
networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(msg.Data)))
networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1)
}
}
}

// DeliverMessage is invoked when a message is delivered
func (t pubsubMetricsTracer) DeliverMessage(msg *pubsub.Message) {
transactionMessagesP2PDeliverMessage.Inc(nil)
}

// RejectMessage is invoked when a message is Rejected or Ignored.
// The reason argument can be one of the named strings Reject*.
func (t pubsubMetricsTracer) RejectMessage(msg *pubsub.Message, reason string) {
// TagCounter cannot handle tags with spaces so pubsub.Reject* cannot be used directly.
// Since Go's strings are immutable, char replacement is a new allocation so that stick to string literals.
switch reason {
case pubsub.RejectValidationThrottled:
transactionMessagesP2PRejectMessage.Add("throttled", 1)
case pubsub.RejectValidationQueueFull:
transactionMessagesP2PRejectMessage.Add("full", 1)
case pubsub.RejectValidationFailed:
transactionMessagesP2PRejectMessage.Add("failed", 1)
case pubsub.RejectValidationIgnored:
transactionMessagesP2PRejectMessage.Add("ignored", 1)
default:
transactionMessagesP2PRejectMessage.Add("other", 1)
}
}

// DuplicateMessage is invoked when a duplicate message is dropped.
func (t pubsubMetricsTracer) DuplicateMessage(msg *pubsub.Message) {
transactionMessagesP2PDuplicateMessage.Inc(nil)
}

// ThrottlePeer is invoked when a peer is throttled by the peer gater.
func (t pubsubMetricsTracer) ThrottlePeer(p peer.ID) {}

// RecvRPC is invoked when an incoming RPC is received.
func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) {
networkP2PGossipSubReceivedBytesTotal.AddUint64(uint64(rpc.Size()), nil)
}

// SendRPC is invoked when a RPC is sent.
func (t pubsubMetricsTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
networkP2PGossipSubSentBytesTotal.AddUint64(uint64(rpc.Size()), nil)
for i := range rpc.GetPublish() {
if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil {
switch *rpc.Publish[i].Topic {
case p2p.TXTopicName:
networkP2PSentBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data)))
networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PMessageSentByTag.Add(string(protocol.TxnTag), 1)
}
}
}
}

// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {}

// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
// the pressure release mechanism trigger, dropping messages.
func (t pubsubMetricsTracer) UndeliverableMessage(msg *pubsub.Message) {
transactionMessagesP2PUnderdeliverableMessage.Inc(nil)
}
76 changes: 76 additions & 0 deletions network/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package network

import (
"go/ast"
"go/parser"
"go/token"
"testing"

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/test/partitiontest"
)

// TestPubsubTracer_TagList makes sure pubsubMetricsTracer traces pubsub messages
// by counting switch cases in SendRPC and ValidateMessage
func TestMetrics_PubsubTracer_TagList(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)

fset := token.NewFileSet()
f, err := parser.ParseFile(fset, "metrics.go", nil, 0)
require.NoError(t, err)

// Find the SendRPC/ValidateMessage functions and count the switch cases
var sendCaseCount int
var recvCaseCount int
ast.Inspect(f, func(n ast.Node) bool {
switch stmt := n.(type) {
case *ast.FuncDecl:
if stmt.Name.Name == "SendRPC" {
ast.Inspect(stmt.Body, func(n ast.Node) bool {
if switchStmt, ok := n.(*ast.SwitchStmt); ok {
for _, stmt := range switchStmt.Body.List {
if _, ok := stmt.(*ast.CaseClause); ok {
sendCaseCount++
}
}
}
return true
})
}
if stmt.Name.Name == "ValidateMessage" {
ast.Inspect(stmt.Body, func(n ast.Node) bool {
if switchStmt, ok := n.(*ast.SwitchStmt); ok {
for _, stmt := range switchStmt.Body.List {
if _, ok := stmt.(*ast.CaseClause); ok {
recvCaseCount++
}
}
}
return true
})
}
}
return true
})

require.Equal(t, len(gossipSubTags), sendCaseCount)
require.Equal(t, len(gossipSubTags), recvCaseCount)
}
4 changes: 2 additions & 2 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error)
}

// MakeService creates a P2P service instance
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler) (*serviceImpl, error) {
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, metricsTracer pubsub.RawTracer) (*serviceImpl, error) {

sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService)
h.Network().Notify(sm)
Expand All @@ -188,7 +188,7 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho
telemetryProtoInfo := formatPeerTelemetryInfoProtocolName(telemetryID, telemetryInstance)
h.SetStreamHandler(protocol.ID(telemetryProtoInfo), func(s network.Stream) { s.Close() })

ps, err := makePubSub(ctx, cfg, h)
ps, err := makePubSub(ctx, cfg, h, metricsTracer)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions network/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@ const (
)

// TXTopicName defines a pubsub topic for TX messages
const TXTopicName = "/algo/tx/0.1.0"
// There is a micro optimization for const string comparison:
// 8 bytes const string require a single x86-64 CMPQ instruction.
// Naming convention: "algo" + 2 bytes protocol tag + 2 bytes version
const TXTopicName = "algotx01"

const incomingThreads = 20 // matches to number wsNetwork workers

func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.PubSub, error) {
func makePubSub(ctx context.Context, cfg config.Local, host host.Host, metricsTracer pubsub.RawTracer) (*pubsub.PubSub, error) {
//defaultParams := pubsub.DefaultGossipSubParams()

options := []pubsub.Option{
Expand Down Expand Up @@ -98,7 +101,10 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
// pubsub.WithValidateThrottle(cfg.TxBacklogSize),
pubsub.WithValidateWorkers(incomingThreads),
pubsub.WithRawTracer(pubsubTracer{}),
}

if metricsTracer != nil {
options = append(options, pubsub.WithRawTracer(metricsTracer))
}

return pubsub.NewGossipSub(ctx, host, options...)
Expand Down
Loading

0 comments on commit f6c59a4

Please sign in to comment.