Skip to content

Commit

Permalink
Metrics refactoring (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
poolnam authored Dec 23, 2020
1 parent 31ca93d commit 0ba46f4
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 22 deletions.
61 changes: 41 additions & 20 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,52 @@ const (
)

var (
discoveryInstanceDurationsSum = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Subsystem: "discovery",
Name: discoveryInstanceDurations,
Help: "Instance discovery latencies in seconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
discoveryInstanceDurationsBuckets = prometheus.ExponentialBuckets(.001, 2.5, 10)
discoveryClusterDurationsBuckets = prometheus.ExponentialBuckets(.001, 2.5, 10)
)

var (
discoveryInstanceDurationsSum = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "discovery",
Name: discoveryInstanceDurations,
Help: "Instance discovery latencies in seconds",
Buckets: discoveryInstanceDurationsBuckets,
}, []string{"cluster_name", "hostname"})

discoveryClusterDurationsSum = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Subsystem: "discovery",
Name: discoveryClusterDurations,
Help: "Cluster discovery latencies in seconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
discoveryClusterDurationsSum = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "discovery",
Name: discoveryClusterDurations,
Help: "Cluster discovery latencies in seconds",
Buckets: discoveryClusterDurationsBuckets,
}, []string{"cluster_name"})

shardCriticalLevelGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "shard",
Name: shardCriticalLevel,
Help: "Critical level of the replica set",
}, []string{"cluster_name", "uuid"})
}, []string{"cluster_name", "uuid", "master_uri"})

shardStateGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "shard",
Name: shardState,
Help: "The state of each shard in the cluster; it will have one line for each possible state of each shard. A value of 1 means the shard is in the state specified by the state label, a value of 0 means it is not.",
}, []string{"cluster_name", "uuid", "state"})
}, []string{"cluster_name", "uuid", "master_uri", "state"})

discoveryErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: "discovery",
Name: "errors",
Help: "Errors that happen during discovery process",
}, []string{"uri"})
)

func init() {
prometheus.MustRegister(discoveryInstanceDurationsSum)
prometheus.MustRegister(discoveryClusterDurationsSum)
prometheus.MustRegister(shardCriticalLevelGauge)
prometheus.MustRegister(shardStateGauge)
prometheus.MustRegister(
discoveryInstanceDurationsSum,
discoveryClusterDurationsSum,
shardCriticalLevelGauge,
shardStateGauge,
discoveryErrors,
)
}

type Transaction interface {
Expand All @@ -51,7 +65,7 @@ type Transaction interface {

type timeTransaction struct {
labels []string
summary *prometheus.SummaryVec
summary *prometheus.HistogramVec
timer *prometheus.Timer
}

Expand Down Expand Up @@ -80,18 +94,25 @@ func StartClusterDiscovery(clusterName string) Transaction {
return txn.Start()
}

func SetShardCriticalLevel(clusterName, uuid string, level int) {
shardCriticalLevelGauge.WithLabelValues(clusterName, uuid).Set(float64(level))
func SetShardCriticalLevel(clusterName, uuid, masterURI string, level int) {
shardCriticalLevelGauge.WithLabelValues(clusterName, uuid, masterURI).Set(float64(level))
}

func SetShardState(clusterName, uuid, state string, active bool) {
func SetShardState(clusterName, uuid, masterURI, state string, active bool) {
v := float64(0)
if active {
v = 1
}
shardStateGauge.With(prometheus.Labels{
"cluster_name": clusterName,
"uuid": uuid,
"master_uri": masterURI,
"state": state,
}).Set(v)
}

func RecordDiscoveryError(uri string) {
discoveryErrors.With(prometheus.Labels{
"uri": uri,
}).Inc()
}
6 changes: 5 additions & 1 deletion internal/vshard/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (c *Cluster) Discover() {
conn := c.Connector(router.URI)
resp := conn.Exec(ctx, vshardRouterInfoQuery)
if resp.Error != nil {
metrics.RecordDiscoveryError(router.URI)
c.logger.
Err(resp.Error).
Str("URI", router.URI).
Expand All @@ -297,6 +298,7 @@ func (c *Cluster) Discover() {

updatedRI, err := ParseRouterInfo(resp.Data)
if err != nil {
metrics.RecordDiscoveryError(router.URI)
c.logger.Err(err).
Str("URI", router.URI).
Str("UUID", string(router.UUID)).
Expand Down Expand Up @@ -366,7 +368,7 @@ func (c *Cluster) Discover() {
ns.ReplicaSets = append(ns.ReplicaSets, set)

code, _ := set.HealthStatus()
metrics.SetShardCriticalLevel(c.Name, string(set.UUID), int(code))
metrics.SetShardCriticalLevel(c.Name, string(set.UUID), set.MasterURI, int(code))
c.logDiscoveredReplicaSet(set)
}

Expand Down Expand Up @@ -437,6 +439,7 @@ func (c *Cluster) discoverInstance(ctx context.Context, inst *Instance) {
conn := c.Connector(inst.URI)
resp := conn.Exec(ctx, vshardInstanceInfoQuery)
if resp.Error != nil {
metrics.RecordDiscoveryError(inst.URI)
c.logger.Err(resp.Error).
Str("URI", inst.URI).
Str("UUID", string(inst.UUID)).
Expand All @@ -447,6 +450,7 @@ func (c *Cluster) discoverInstance(ctx context.Context, inst *Instance) {

info, err := ParseInstanceInfo(resp.Data)
if err != nil {
metrics.RecordDiscoveryError(inst.URI)
c.logger.Err(err).
Str("URI", inst.URI).
Str("UUID", string(inst.UUID)).
Expand Down
2 changes: 1 addition & 1 deletion internal/vshard/orchestrator/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (m *storageMonitor) checkCluster(stream AnalysisWriteStream) {

for _, state := range ReplicaSetStateEnum {
active := state == analysis.State
metrics.SetShardState(m.cluster.Name, string(set.UUID), string(state), active)
metrics.SetShardState(m.cluster.Name, string(set.UUID), set.MasterURI, string(state), active)
}
}
}(set)
Expand Down

0 comments on commit 0ba46f4

Please sign in to comment.