Skip to content

Commit

Permalink
feat: add remaining metrics and update dashboard cfg
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Jan 25, 2024
1 parent c062554 commit aad6d1b
Show file tree
Hide file tree
Showing 10 changed files with 1,311 additions and 28 deletions.
1,224 changes: 1,211 additions & 13 deletions ops/metrics/grafana/provisioning/dashboards/dashboard.json

Large diffs are not rendered by default.

Empty file.
2 changes: 1 addition & 1 deletion ops/metrics/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 5s
scrape_interval: 1s
static_configs:
- targets: ['opentelemetry-collector:9095']
4 changes: 4 additions & 0 deletions pkg/compute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rs/zerolog/log"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"

"github.com/bacalhau-project/bacalhau/pkg/compute/store"
"github.com/bacalhau-project/bacalhau/pkg/executor"
Expand Down Expand Up @@ -298,11 +299,14 @@ func (e *BaseExecutor) Run(ctx context.Context, state store.LocalExecutionState)
Str("execution", execution.ID).
Logger().WithContext(ctx)

stopwatch := telemetry.NewTimer(jobDurationMilliseconds)
stopwatch.Start()
operation := "Running"
defer func() {
if err != nil {
e.handleFailure(ctx, state, err, operation)
}
stopwatch.Stop(ctx, state.Execution.Job.MetricAttributes()...)
}()

res := e.Start(ctx, execution)
Expand Down
25 changes: 16 additions & 9 deletions pkg/compute/metrics.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
package compute

import (
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

// Metrics for monitoring compute nodes:
var (
meter = otel.GetMeterProvider().Meter("compute")
jobsReceived, _ = meter.Int64Counter(
meter = otel.GetMeterProvider().Meter("compute")
jobsReceived = lo.Must(meter.Int64Counter(
"jobs_received",
metric.WithDescription("Number of jobs received by the compute node"),
)
))

jobsAccepted, _ = meter.Int64Counter(
jobsAccepted = lo.Must(meter.Int64Counter(
"jobs_accepted",
metric.WithDescription("Number of jobs bid on and accepted by the compute node"),
)
))

jobsCompleted, _ = meter.Int64Counter(
jobsCompleted = lo.Must(meter.Int64Counter(
"jobs_completed",
metric.WithDescription("Number of jobs completed by the compute node."),
)
))

jobsFailed, _ = meter.Int64Counter(
jobsFailed = lo.Must(meter.Int64Counter(
"jobs_failed",
metric.WithDescription("Number of jobs failed by the compute node."),
)
))

jobDurationMilliseconds = lo.Must(meter.Int64Histogram(
"job_duration_milliseconds",
metric.WithDescription("Duration of a job on the compute node in milliseconds."),
metric.WithUnit("ms"),
))
)
3 changes: 2 additions & 1 deletion pkg/models/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"errors"
"time"

"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
"github.com/hashicorp/go-multierror"

"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
)

// ExecutionStateType The state of an execution. An execution represents a single attempt to execute a job on a node.
Expand Down
20 changes: 20 additions & 0 deletions pkg/node/metrics/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package metrics

import (
"github.com/samber/lo"
"go.opentelemetry.io/otel"

"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

var (
nodeMeter = otel.GetMeterProvider().Meter("bacalhau-node")
)

var (
NodeInfo = lo.Must(telemetry.NewCounter(
nodeMeter,
"bacalhau_node_info",
"A static metric with labels describing the bacalhau node",
))
)
20 changes: 16 additions & 4 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"fmt"
"time"

"github.com/hashicorp/go-multierror"
"github.com/imdario/mergo"
"github.com/labstack/echo/v4"
"github.com/libp2p/go-libp2p/core/host"
"go.opentelemetry.io/otel/attribute"

"github.com/bacalhau-project/bacalhau/pkg/authz"
pkgconfig "github.com/bacalhau-project/bacalhau/pkg/config"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
Expand All @@ -14,6 +20,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/model"
"github.com/bacalhau-project/bacalhau/pkg/models"
nats_transport "github.com/bacalhau-project/bacalhau/pkg/nats/transport"
"github.com/bacalhau-project/bacalhau/pkg/node/metrics"
"github.com/bacalhau-project/bacalhau/pkg/publicapi"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/endpoint/agent"
Expand All @@ -24,10 +31,6 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/transport"
"github.com/bacalhau-project/bacalhau/pkg/version"
"github.com/hashicorp/go-multierror"
"github.com/imdario/mergo"
"github.com/labstack/echo/v4"
"github.com/libp2p/go-libp2p/core/host"
)

type FeatureConfig struct {
Expand Down Expand Up @@ -365,6 +368,15 @@ func NewNode(
return errors.ErrorOrNil()
})

metrics.NodeInfo.Add(ctx, 1,
attribute.String("node_id", config.NodeID),
attribute.String("node_network_transport", config.NetworkConfig.Type),
attribute.Bool("node_is_compute", config.IsComputeNode),
attribute.Bool("node_is_requester", config.IsRequesterNode),
attribute.StringSlice("node_engines", executors.Keys(ctx)),
attribute.StringSlice("node_publishers", publishers.Keys(ctx)),
attribute.StringSlice("node_storages", storageProviders.Keys(ctx)),
)
node := &Node{
ID: config.NodeID,
CleanupManager: config.CleanupManager,
Expand Down
1 change: 1 addition & 0 deletions pkg/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func newMeterProvider() {
return
}

// TODO we can decrese the read rate from the default 30sec to something quicker by passing option here.
reader := sdkmetric.NewPeriodicReader(exp)

meterProvider = sdkmetric.NewMeterProvider(
Expand Down
40 changes: 40 additions & 0 deletions pkg/telemetry/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package telemetry

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// Timer measures the duration of an event.
type Timer struct {
startTime time.Time
durationRecorder metric.Int64Histogram
}

func NewTimer(durationRecorder metric.Int64Histogram) *Timer {
return &Timer{
durationRecorder: durationRecorder,
}
}

// Start begins the timer by recording the current time.
func (t *Timer) Start() {
t.startTime = time.Now()
}

// Stop ends the timer and records the duration since Start was called.
// `attrs` are optional attributes that can be added to the duration metric for additional context.
func (t *Timer) Stop(ctx context.Context, attrs ...attribute.KeyValue) {
if t.startTime.IsZero() {
// Handle the case where Stop is called without Start being called.
return
}

// Calculate the duration and record it using the OpenTelemetry histogram.
duration := time.Since(t.startTime).Milliseconds()
t.durationRecorder.Record(ctx, duration, metric.WithAttributes(attrs...))
t.startTime = time.Time{} // Reset the start time for future use.
}

0 comments on commit aad6d1b

Please sign in to comment.