Skip to content

Commit

Permalink
Metrics: refactoring, shard state logging (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
poolnam authored Jan 27, 2021
1 parent 9168eb8 commit 5cfdd01
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 25 deletions.
59 changes: 40 additions & 19 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package metrics

import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
)

const (
discoveryInstanceDurations = "instance_durations"
discoveryClusterDurations = "cluster_durations"
shardCriticalLevel = "critical_level"
shardState = "state"
shardStateEvent = "shard_state_event"
)

const (
labelClusterName = "cluster_name"
labelHostName = "hostname"
labelShardState = "shard_state"
labelShardUUID = "shard_uuid"
)

var (
Expand All @@ -20,42 +30,49 @@ var (
Name: discoveryInstanceDurations,
Help: "Instance discovery latencies in seconds",
Buckets: discoveryInstanceDurationsBuckets,
}, []string{"cluster_name", "hostname"})
}, []string{labelClusterName, labelHostName})

discoveryClusterDurationsSum = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "discovery",
Name: discoveryClusterDurations,
Help: "Cluster discovery latencies in seconds",
Buckets: discoveryClusterDurationsBuckets,
}, []string{"cluster_name"})
}, []string{labelClusterName})

shardCriticalLevelGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "shard",
Name: shardCriticalLevel,
Help: "Critical level of the replica set",
}, []string{"cluster_name", "uuid", "master_uri"})
}, []string{labelClusterName, labelShardUUID})

shardStateGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "shard",
Name: shardState,
Help: "The state of each shard in the cluster; it will have one line for each possible state of each shard. A value of 1 means the shard is in the state specified by the state label, a value of 0 means it is not.",
}, []string{"cluster_name", "uuid", "master_uri", "state"})
}, []string{labelClusterName, labelShardUUID, labelShardState})

discoveryErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
discoveryErrors = prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "discovery",
Name: "errors",
Help: "Errors that happen during discovery process",
}, []string{"cluster_name", "uri"})
})

shardStateCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: "orchestrator",
Name: shardStateEvent,
Help: "Discovered shard state event",
}, []string{labelClusterName, labelShardUUID, labelShardState})
)

func init() {
discoveryErrors.With(prometheus.Labels{"cluster_name": "", "uri": ""}).Add(0)
discoveryErrors.Add(0)
prometheus.MustRegister(
discoveryInstanceDurationsSum,
discoveryClusterDurationsSum,
shardCriticalLevelGauge,
shardStateGauge,
discoveryErrors,
shardStateCounter,
)
}

Expand Down Expand Up @@ -95,26 +112,30 @@ func StartClusterDiscovery(clusterName string) Transaction {
return txn.Start()
}

func SetShardCriticalLevel(clusterName, uuid, masterURI string, level int) {
shardCriticalLevelGauge.WithLabelValues(clusterName, uuid, masterURI).Set(float64(level))
func SetShardCriticalLevel(clusterName, uuid string, level int) {
shardCriticalLevelGauge.WithLabelValues(clusterName, uuid).Set(float64(level))
}

func SetShardState(clusterName, uuid, masterURI, state string, active bool) {
func SetShardState(clusterName, uuid, state string, active bool) {
v := float64(0)
if active {
v = 1
}
shardStateGauge.With(prometheus.Labels{
"cluster_name": clusterName,
"uuid": uuid,
"master_uri": masterURI,
"state": state,
labelClusterName: clusterName,
labelShardUUID: uuid,
labelShardState: state,
}).Set(v)
}

func RecordDiscoveryError(clusterName, uri string) {
discoveryErrors.With(prometheus.Labels{
"cluster_name": clusterName,
"uri": uri,
func RecordDiscoveryError() {
discoveryErrors.Inc()
}

func RecordDiscoveredShardState(clusterName, shardUUID, state string) {
shardStateCounter.With(prometheus.Labels{
labelClusterName: clusterName,
labelShardUUID: shardUUID,
labelShardState: state,
}).Inc()
}
10 changes: 5 additions & 5 deletions internal/vshard/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (c *Cluster) Discover() {
conn := c.Connector(router.URI)
resp := conn.Exec(ctx, vshardRouterInfoQuery)
if resp.Error != nil {
metrics.RecordDiscoveryError(c.Name, router.URI)
metrics.RecordDiscoveryError()
c.logger.
Err(resp.Error).
Str("URI", router.URI).
Expand All @@ -302,7 +302,7 @@ func (c *Cluster) Discover() {

updatedRI, err := ParseRouterInfo(resp.Data)
if err != nil {
metrics.RecordDiscoveryError(c.Name, router.URI)
metrics.RecordDiscoveryError()
c.logger.Err(err).
Str("URI", router.URI).
Msg("Failed to discover the topology of the cluster using router")
Expand Down Expand Up @@ -371,7 +371,7 @@ func (c *Cluster) Discover() {
ns.ReplicaSets = append(ns.ReplicaSets, set)

code, _ := set.HealthStatus()
metrics.SetShardCriticalLevel(c.Name, string(set.UUID), set.MasterURI, int(code))
metrics.SetShardCriticalLevel(c.Name, string(set.UUID), int(code))
c.logDiscoveredReplicaSet(set)
}

Expand Down Expand Up @@ -446,7 +446,7 @@ func (c *Cluster) discoverInstance(ctx context.Context, inst *Instance) {
conn := c.Connector(inst.URI)
resp := conn.Exec(ctx, vshardInstanceInfoQuery)
if resp.Error != nil {
metrics.RecordDiscoveryError(c.Name, inst.URI)
metrics.RecordDiscoveryError()
c.logger.Err(resp.Error).
Str("URI", inst.URI).
Str("UUID", string(inst.UUID)).
Expand All @@ -457,7 +457,7 @@ func (c *Cluster) discoverInstance(ctx context.Context, inst *Instance) {

info, err := ParseInstanceInfo(resp.Data)
if err != nil {
metrics.RecordDiscoveryError(c.Name, inst.URI)
metrics.RecordDiscoveryError()
c.logger.Err(err).
Str("URI", inst.URI).
Str("UUID", string(inst.UUID)).
Expand Down
2 changes: 2 additions & 0 deletions internal/vshard/orchestrator/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rs/zerolog"
"github.com/viciious/go-tarantool"

"github.com/shmel1k/qumomf/internal/metrics"
"github.com/shmel1k/qumomf/internal/quorum"
"github.com/shmel1k/qumomf/internal/util"
"github.com/shmel1k/qumomf/internal/vshard"
Expand Down Expand Up @@ -148,6 +149,7 @@ func (f *failover) checkAndRecover(ctx context.Context, analysis *ReplicationAna
Str("master_uri", analysis.Set.MasterURI).
Logger()
logger.WithLevel(f.sampler.sample(analysis)).Str("analysis", analysis.String()).Msg("checkAndRecover")
metrics.RecordDiscoveredShardState(f.cluster.Name, string(analysis.Set.UUID), string(analysis.State))

recvFunc, desc := f.getCheckAndRecoveryFunc(analysis.State)
if recvFunc == nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/vshard/orchestrator/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (m *storageMonitor) checkCluster(stream AnalysisWriteStream) {

for _, state := range ReplicaSetStateEnum {
active := state == analysis.State
metrics.SetShardState(m.cluster.Name, string(set.UUID), set.MasterURI, string(state), active)
metrics.SetShardState(m.cluster.Name, string(set.UUID), string(state), active)
}
}
}(set)
Expand Down

0 comments on commit 5cfdd01

Please sign in to comment.