Skip to content

Commit

Permalink
chore: Add flag to skip legacy duplicate telemetry (#630)
Browse files Browse the repository at this point in the history
Currently we have some legacy metrics with `peer_id` in the metrics suffix (in
addition to same metrics with `peer_id`d as label)
1. `raft_replication_appendEntries_rpc_peer0`
2. `raft_replication_appendEntries_logs_peer0`
3. `raft_replication_heartbeat_peer0`
4. `raft_replication_installSnapshot_peer0`

These metrics may have additional `_count` or `_sum` metrics. And each metrics
are multiplicative. Meaning if I have 10 peers, these metrics will be 10x.

This PR adds a flag `noLegacyTelemetry` (default: false) which by setting to
`true` you can skip those duplicate metrics.

---------

Signed-off-by: Kaviraj <[email protected]>
Co-authored-by: Piotr Kazmierczak <[email protected]>
  • Loading branch information
kavirajk and pkazmierczak authored Dec 12, 2024
1 parent 7e8e836 commit a5bc06c
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 13 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# UNRELEASED

IMPROVEMENETS

* Added a flag to skip legacy duplicate telemetry. [GH-630](https://github.com/hashicorp/raft/pull/630)

# 1.7.0 (June 5th, 2024)

CHANGES
Expand Down
15 changes: 12 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ type Raft struct {
// preVoteDisabled control if the pre-vote feature is activated,
// prevote feature is disabled if set to true.
preVoteDisabled bool

// noLegacyTelemetry allows to skip the legacy metrics to avoid duplicates.
// legacy metrics are those that have `_peer_name` as metric suffix instead as labels.
// e.g: raft_replication_heartbeat_peer0
noLegacyTelemetry bool
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand All @@ -232,7 +237,8 @@ type Raft struct {
// listing just itself as a Voter, then invoke AddVoter() on it to add other
// servers to the cluster.
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
snaps SnapshotStore, trans Transport, configuration Configuration,
) error {
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
return err
Expand Down Expand Up @@ -305,7 +311,8 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
// the sole voter, and then join up other new clean-state peer servers using
// the usual APIs in order to bring the cluster back into a known state.
func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
snaps SnapshotStore, trans Transport, configuration Configuration,
) error {
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
return err
Expand Down Expand Up @@ -436,7 +443,8 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// without starting a Raft instance or connecting to the cluster. This function
// has identical behavior to Raft.GetConfiguration.
func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport) (Configuration, error) {
snaps SnapshotStore, trans Transport,
) (Configuration, error) {
conf.skipStartup = true
r, err := NewRaft(conf, fsm, logs, stable, snaps, trans)
if err != nil {
Expand Down Expand Up @@ -566,6 +574,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
followerNotifyCh: make(chan struct{}, 1),
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second),
preVoteDisabled: conf.PreVoteDisabled || !transportSupportPreVote,
noLegacyTelemetry: conf.NoLegacyTelemetry,
}
if !transportSupportPreVote && !conf.PreVoteDisabled {
r.logger.Warn("pre-vote is disabled because it is not supported by the Transport")
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ type Config struct {
// PreVoteDisabled deactivate the pre-vote feature when set to true
PreVoteDisabled bool

// NoLegacyTelemetry allows to skip the legacy metrics to avoid duplicates.
// legacy metrics are those that have `_peer_name` as metric suffix instead as labels.
// e.g: raft_replication_heartbeat_peer0
NoLegacyTelemetry bool

// skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool
}
Expand Down
30 changes: 20 additions & 10 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ START:
s.failures++
return
}
appendStats(string(peer.ID), start, float32(len(req.Entries)))
appendStats(string(peer.ID), start, float32(len(req.Entries)), r.noLegacyTelemetry)

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand Down Expand Up @@ -347,8 +347,11 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
}
labels := []metrics.Label{{Name: "peer_id", Value: string(peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "installSnapshot"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(peer.ID)}, start)

if !r.noLegacyTelemetry {
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(peer.ID)}, start)
}

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand Down Expand Up @@ -423,8 +426,12 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
failures = 0
labels := []metrics.Label{{Name: "peer_id", Value: string(peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "heartbeat"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(peer.ID)}, start)

if !r.noLegacyTelemetry {
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(peer.ID)}, start)
}

s.notifyAll(resp.Success)
}
}
Expand Down Expand Up @@ -533,7 +540,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,
s.peerLock.RUnlock()

req, resp := ready.Request(), ready.Response()
appendStats(string(peer.ID), ready.Start(), float32(len(req.Entries)))
appendStats(string(peer.ID), ready.Start(), float32(len(req.Entries)), r.noLegacyTelemetry)

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand Down Expand Up @@ -621,13 +628,16 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64
}

// appendStats is used to emit stats about an AppendEntries invocation.
func appendStats(peer string, start time.Time, logs float32) {
func appendStats(peer string, start time.Time, logs float32, skipLegacy bool) {
labels := []metrics.Label{{Name: "peer_id", Value: peer}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "appendEntries", "rpc"}, start, labels)
metrics.IncrCounterWithLabels([]string{"raft", "replication", "appendEntries", "logs"}, logs, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)

if !skipLegacy {
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)
}
}

// handleStaleTerm is used when a follower indicates that we have a stale term.
Expand Down

0 comments on commit a5bc06c

Please sign in to comment.