Skip to content

Commit

Permalink
Merge pull request asymmetric-research#46 from asymmetric-research/bl…
Browse files Browse the repository at this point in the history
…ock-sizes

Block sizes!
  • Loading branch information
johnstonematt authored Oct 17, 2024
2 parents 39cfc3a + f3cb194 commit e3c8396
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 26 deletions.
15 changes: 14 additions & 1 deletion cmd/solana_exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type (
NodeKeys []string
BalanceAddresses []string
ComprehensiveSlotTracking bool
MonitorBlockSizes bool
}
)

Expand All @@ -35,6 +36,7 @@ func NewExporterConfig(
nodeKeys []string,
balanceAddresses []string,
comprehensiveSlotTracking bool,
monitorBlockSizes bool,
) *ExporterConfig {
return &ExporterConfig{
HttpTimeout: time.Duration(httpTimeout) * time.Second,
Expand All @@ -43,6 +45,7 @@ func NewExporterConfig(
NodeKeys: nodeKeys,
BalanceAddresses: balanceAddresses,
ComprehensiveSlotTracking: comprehensiveSlotTracking,
MonitorBlockSizes: monitorBlockSizes,
}
}

Expand All @@ -54,6 +57,7 @@ func NewExporterConfigFromCLI() *ExporterConfig {
nodekeys arrayFlags
balanceAddresses arrayFlags
comprehensiveSlotTracking bool
monitorBlockSizes bool
)
flag.IntVar(
&httpTimeout,
Expand Down Expand Up @@ -92,7 +96,16 @@ func NewExporterConfigFromCLI() *ExporterConfig {
"Set this flag to track solana_leader_slots_by_epoch for ALL validators. "+
"Warning: this will lead to potentially thousands of new Prometheus metrics being created every epoch.",
)
flag.BoolVar(
&monitorBlockSizes,
"monitor-block-sizes",
false,
"Set this flag to track block sizes (number of transactions) for the configured validators. "+
"Warning: this might grind the RPC node.",
)
flag.Parse()

return NewExporterConfig(httpTimeout, rpcUrl, listenAddress, nodekeys, balanceAddresses, comprehensiveSlotTracking)
return NewExporterConfig(
httpTimeout, rpcUrl, listenAddress, nodekeys, balanceAddresses, comprehensiveSlotTracking, monitorBlockSizes,
)
}
4 changes: 3 additions & 1 deletion cmd/solana_exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ func main() {
}

collector := NewSolanaCollector(client, slotPacerSchedule, config.BalanceAddresses, config.NodeKeys, votekeys)
slotWatcher := NewSlotWatcher(client, config.NodeKeys, votekeys, config.ComprehensiveSlotTracking)
slotWatcher := NewSlotWatcher(
client, config.NodeKeys, votekeys, config.ComprehensiveSlotTracking, config.MonitorBlockSizes,
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go slotWatcher.WatchSlots(ctx, collector.slotPace)
Expand Down
8 changes: 6 additions & 2 deletions cmd/solana_exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ func (c *staticRPCClient) GetLeaderSchedule(
}

//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetBlock(ctx context.Context, commitment rpc.Commitment, slot int64) (*rpc.Block, error) {
func (c *staticRPCClient) GetBlock(
ctx context.Context, commitment rpc.Commitment, slot int64, transactionDetails string,
) (*rpc.Block, error) {
return nil, nil
}

Expand Down Expand Up @@ -365,7 +367,9 @@ func (c *dynamicRPCClient) GetLeaderSchedule(
}

//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetBlock(ctx context.Context, commitment rpc.Commitment, slot int64) (*rpc.Block, error) {
func (c *dynamicRPCClient) GetBlock(
ctx context.Context, commitment rpc.Commitment, slot int64, transactionDetails string,
) (*rpc.Block, error) {
return nil, nil
}

Expand Down
42 changes: 34 additions & 8 deletions cmd/solana_exporter/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type SlotWatcher struct {
nodekeys []string
votekeys []string
comprehensiveSlotTracking bool
monitorBlockSizes bool

// currentEpoch is the current epoch we are watching
currentEpoch int64
Expand All @@ -46,16 +47,22 @@ type SlotWatcher struct {
LeaderSlotsByEpochMetric *prometheus.CounterVec
InflationRewardsMetric *prometheus.GaugeVec
FeeRewardsMetric *prometheus.CounterVec
BlockSizeMetric *prometheus.GaugeVec
}

func NewSlotWatcher(
client rpc.Provider, nodekeys []string, votekeys []string, comprehensiveSlotTracking bool,
client rpc.Provider,
nodekeys []string,
votekeys []string,
comprehensiveSlotTracking bool,
monitorBlockSizes bool,
) *SlotWatcher {
watcher := SlotWatcher{
client: client,
nodekeys: nodekeys,
votekeys: votekeys,
comprehensiveSlotTracking: comprehensiveSlotTracking,
monitorBlockSizes: monitorBlockSizes,
// metrics:
TotalTransactionsMetric: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "solana_confirmed_transactions_total",
Expand Down Expand Up @@ -105,6 +112,13 @@ func NewSlotWatcher(
},
[]string{NodekeyLabel, EpochLabel},
),
BlockSizeMetric: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "solana_block_size",
Help: "Number of transactions per block, grouped by validator nodekey (identity)",
},
[]string{NodekeyLabel},
),
}
// register:
for _, collector := range []prometheus.Collector{
Expand All @@ -117,6 +131,7 @@ func NewSlotWatcher(
watcher.LeaderSlotsByEpochMetric,
watcher.InflationRewardsMetric,
watcher.FeeRewardsMetric,
watcher.BlockSizeMetric,
} {
if err := prometheus.Register(collector); err != nil {
var (
Expand Down Expand Up @@ -265,7 +280,7 @@ func (c *SlotWatcher) checkValidSlotRange(from, to int64) error {
// moveSlotWatermark performs all the slot-watching tasks required to move the slotWatermark to the provided 'to' slot.
func (c *SlotWatcher) moveSlotWatermark(ctx context.Context, to int64) {
c.fetchAndEmitBlockProduction(ctx, to)
c.fetchAndEmitFeeRewards(ctx, to)
c.fetchAndEmitBlockInfos(ctx, to)
c.slotWatermark = to
}

Expand Down Expand Up @@ -306,9 +321,9 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i
klog.Infof("Fetched block production in [%v -> %v]", startSlot, endSlot)
}

// fetchAndEmitFeeRewards fetches and emits all the fee rewards for the tracked addresses between the
// fetchAndEmitBlockInfos fetches and emits all the fee rewards (+ block sizes) for the tracked addresses between the
// slotWatermark and endSlot
func (c *SlotWatcher) fetchAndEmitFeeRewards(ctx context.Context, endSlot int64) {
func (c *SlotWatcher) fetchAndEmitBlockInfos(ctx context.Context, endSlot int64) {
startSlot := c.slotWatermark + 1
klog.Infof("Fetching fee rewards in [%v -> %v]", startSlot, endSlot)

Expand All @@ -323,7 +338,7 @@ func (c *SlotWatcher) fetchAndEmitFeeRewards(ctx context.Context, endSlot int64)

klog.Infof("Fetching fee rewards for %v in [%v -> %v]: %v ...", identity, startSlot, endSlot, leaderSlots)
for _, slot := range leaderSlots {
err := c.fetchAndEmitSingleFeeReward(ctx, identity, c.currentEpoch, slot)
err := c.fetchAndEmitSingleBlockInfo(ctx, identity, c.currentEpoch, slot)
if err != nil {
klog.Errorf("Failed to fetch fee rewards for %v at %v: %v", identity, slot, err)
}
Expand All @@ -333,11 +348,17 @@ func (c *SlotWatcher) fetchAndEmitFeeRewards(ctx context.Context, endSlot int64)
klog.Infof("Fetched fee rewards in [%v -> %v]", startSlot, endSlot)
}

// fetchAndEmitSingleFeeReward fetches and emits the fee reward for a single block.
func (c *SlotWatcher) fetchAndEmitSingleFeeReward(
// fetchAndEmitSingleBlockInfo fetches and emits the fee reward + block size for a single block.
func (c *SlotWatcher) fetchAndEmitSingleBlockInfo(
ctx context.Context, identity string, epoch int64, slot int64,
) error {
block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot)
var transactionDetails string
if c.monitorBlockSizes {
transactionDetails = "accounts"
} else {
transactionDetails = "none"
}
block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot, transactionDetails)
if err != nil {
var rpcError *rpc.RPCError
if errors.As(err, &rpcError) {
Expand All @@ -364,6 +385,11 @@ func (c *SlotWatcher) fetchAndEmitSingleFeeReward(
}
}

// track block size:
if c.monitorBlockSizes {
c.BlockSizeMetric.WithLabelValues(identity).Set(float64(len(block.Transactions)))
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/solana_exporter/slots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func assertSlotMetricsChangeCorrectly(t *testing.T, initial slotMetricValues, fi
func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
client := staticRPCClient{}
collector := NewSolanaCollector(&client, 100*time.Millisecond, nil, identities, votekeys)
watcher := NewSlotWatcher(&client, identities, votekeys, false)
watcher := NewSlotWatcher(&client, identities, votekeys, false, false)
// reset metrics before running tests:
watcher.LeaderSlotsTotalMetric.Reset()
watcher.LeaderSlotsByEpochMetric.Reset()
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {
// create clients:
client := newDynamicRPCClient()
collector := NewSolanaCollector(client, 300*time.Millisecond, nil, identities, votekeys)
watcher := NewSlotWatcher(client, identities, votekeys, false)
watcher := NewSlotWatcher(client, identities, votekeys, false, false)
// reset metrics before running tests:
watcher.LeaderSlotsTotalMetric.Reset()
watcher.LeaderSlotsByEpochMetric.Reset()
Expand Down
22 changes: 16 additions & 6 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"k8s.io/klog/v2"
"net/http"
"slices"
"time"
)

Expand Down Expand Up @@ -71,7 +72,7 @@ type Provider interface {

GetLeaderSchedule(ctx context.Context, commitment Commitment, slot int64) (map[string][]int64, error)

GetBlock(ctx context.Context, commitment Commitment, slot int64) (*Block, error)
GetBlock(ctx context.Context, commitment Commitment, slot int64, transactionDetails string) (*Block, error)
}

func (c Commitment) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -274,16 +275,25 @@ func (c *Client) GetLeaderSchedule(ctx context.Context, commitment Commitment, s

// GetBlock returns identity and transaction information about a confirmed block in the ledger.
// See API docs: https://solana.com/docs/rpc/http/getblock
func (c *Client) GetBlock(ctx context.Context, commitment Commitment, slot int64) (*Block, error) {
func (c *Client) GetBlock(
ctx context.Context, commitment Commitment, slot int64, transactionDetails string,
) (*Block, error) {
detailsOptions := []string{"full", "accounts", "none"}
if !slices.Contains(detailsOptions, transactionDetails) {
klog.Fatalf(
"%s is not a valid transaction-details option, must be one of %v", transactionDetails, detailsOptions,
)
}
if commitment == CommitmentProcessed {
// as per https://solana.com/docs/rpc/http/getblock
klog.Fatalf("commitment '%v' is not supported for GetBlock", CommitmentProcessed)
}
config := map[string]any{
"commitment": commitment,
"encoding": "json", // this is default, but no harm in specifying it
"transactionDetails": "none", // for now, can hard-code this out, as we don't need it
"rewards": true, // what we here for!
"commitment": commitment,
"encoding": "json", // this is default, but no harm in specifying it
"transactionDetails": transactionDetails,
"rewards": true, // what we here for!
"maxSupportedTransactionVersion": 0,
}
var resp response[Block]
if err := getResponse(ctx, c, "getBlock", []any{slot, config}, &resp); err != nil {
Expand Down
13 changes: 7 additions & 6 deletions pkg/rpc/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ type (
}

Block struct {
BlockHeight int64 `json:"blockHeight"`
BlockTime int64 `json:"blockTime,omitempty"`
Blockhash string `json:"blockhash"`
ParentSlot int64 `json:"parentSlot"`
PreviousBlockhash string `json:"previousBlockhash"`
Rewards []BlockReward `json:"rewards"`
BlockHeight int64 `json:"blockHeight"`
BlockTime int64 `json:"blockTime,omitempty"`
Blockhash string `json:"blockhash"`
ParentSlot int64 `json:"parentSlot"`
PreviousBlockhash string `json:"previousBlockhash"`
Rewards []BlockReward `json:"rewards"`
Transactions []map[string]any `json:"transactions"`
}

BlockReward struct {
Expand Down

0 comments on commit e3c8396

Please sign in to comment.