Skip to content

Commit

Permalink
feat: add job and execution metrics:
Browse files Browse the repository at this point in the history
- Job Duration
- Number of Jobs Submitted
- Active Docker Executions
- Active WASM Executions
- Basic Node Info (metric isn't valuable, labels are)
  • Loading branch information
frrist authored and frrist committed Jan 26, 2024
1 parent 3e34681 commit 7a5f9c8
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 22 deletions.
4 changes: 4 additions & 0 deletions pkg/compute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rs/zerolog/log"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"

"github.com/bacalhau-project/bacalhau/pkg/compute/store"
"github.com/bacalhau-project/bacalhau/pkg/executor"
Expand Down Expand Up @@ -298,11 +299,14 @@ func (e *BaseExecutor) Run(ctx context.Context, state store.LocalExecutionState)
Str("execution", execution.ID).
Logger().WithContext(ctx)

stopwatch := telemetry.NewTimer(jobDurationMilliseconds)
stopwatch.Start()
operation := "Running"
defer func() {
if err != nil {
e.handleFailure(ctx, state, err, operation)
}
stopwatch.Stop(ctx, state.Execution.Job.MetricAttributes()...)
}()

res := e.Start(ctx, execution)
Expand Down
25 changes: 16 additions & 9 deletions pkg/compute/metrics.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
package compute

import (
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

// Metrics for monitoring compute nodes:
var (
meter = otel.GetMeterProvider().Meter("compute")
jobsReceived, _ = meter.Int64Counter(
meter = otel.GetMeterProvider().Meter("compute")
jobsReceived = lo.Must(meter.Int64Counter(
"jobs_received",
metric.WithDescription("Number of jobs received by the compute node"),
)
))

jobsAccepted, _ = meter.Int64Counter(
jobsAccepted = lo.Must(meter.Int64Counter(
"jobs_accepted",
metric.WithDescription("Number of jobs bid on and accepted by the compute node"),
)
))

jobsCompleted, _ = meter.Int64Counter(
jobsCompleted = lo.Must(meter.Int64Counter(
"jobs_completed",
metric.WithDescription("Number of jobs completed by the compute node."),
)
))

jobsFailed, _ = meter.Int64Counter(
jobsFailed = lo.Must(meter.Int64Counter(
"jobs_failed",
metric.WithDescription("Number of jobs failed by the compute node."),
)
))

jobDurationMilliseconds = lo.Must(meter.Int64Histogram(
"job_duration_milliseconds",
metric.WithDescription("Duration of a job on the compute node in milliseconds."),
metric.WithUnit("ms"),
))
)
3 changes: 3 additions & 0 deletions pkg/executor/docker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/atomic"

"github.com/bacalhau-project/bacalhau/pkg/docker"
Expand Down Expand Up @@ -52,6 +53,7 @@ type executionHandler struct {

//nolint:funlen
func (h *executionHandler) run(ctx context.Context) {
ActiveExecutions.Inc(ctx, attribute.String("executor_id", h.ID))
h.running.Store(true)
defer func() {
destroyTimeout := time.Second * 10
Expand All @@ -60,6 +62,7 @@ func (h *executionHandler) run(ctx context.Context) {
}
h.running.Store(false)
close(h.waitCh)
ActiveExecutions.Dec(ctx, attribute.String("executor_id", h.ID))
}()
// start the container
h.logger.Info().Msg("starting container execution")
Expand Down
20 changes: 20 additions & 0 deletions pkg/executor/docker/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package docker

import (
"github.com/samber/lo"
"go.opentelemetry.io/otel"

"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

var (
dockerExecutorMeter = otel.GetMeterProvider().Meter("docker-executor")
)

var (
ActiveExecutions = lo.Must(telemetry.NewGauge(
dockerExecutorMeter,
"docker_active_executions",
"Number of active docker executions",
))
)
4 changes: 3 additions & 1 deletion pkg/executor/wasm/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/executor"
wasmmodels "github.com/bacalhau-project/bacalhau/pkg/executor/wasm/models"
wasmlogs "github.com/bacalhau-project/bacalhau/pkg/logger/wasm"
models "github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/storage"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
"github.com/bacalhau-project/bacalhau/pkg/util/closer"
Expand Down Expand Up @@ -62,6 +62,7 @@ type executionHandler struct {

//nolint:funlen
func (h *executionHandler) run(ctx context.Context) {
ActiveExecutions.Inc(ctx)
defer func() {
if r := recover(); r != nil {
h.logger.Error().
Expand All @@ -70,6 +71,7 @@ func (h *executionHandler) run(ctx context.Context) {
// TODO don't do this.
h.result = &models.RunCommandResult{}
}
ActiveExecutions.Dec(ctx)
}()

var wasmCtx context.Context
Expand Down
20 changes: 20 additions & 0 deletions pkg/executor/wasm/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package wasm

import (
"github.com/samber/lo"
"go.opentelemetry.io/otel"

"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

var (
wasmExecutorMeter = otel.GetMeterProvider().Meter("wasm-executor")
)

var (
ActiveExecutions = lo.Must(telemetry.NewGauge(
wasmExecutorMeter,
"wasm_active_executions",
"Number of active WASM executions",
))
)
3 changes: 2 additions & 1 deletion pkg/models/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"errors"
"time"

"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
"github.com/hashicorp/go-multierror"

"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
)

// ExecutionStateType The state of an execution. An execution represents a single attempt to execute a job on a node.
Expand Down
11 changes: 9 additions & 2 deletions pkg/models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"strings"
"time"

"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
"github.com/hashicorp/go-multierror"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/exp/maps"

"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
)

type JobStateType int
Expand Down Expand Up @@ -67,7 +69,7 @@ func (s *JobStateType) UnmarshalText(text []byte) (err error) {
type Job struct {
// ID is a unique identifier assigned to this job.
// It helps to distinguish jobs with the same name after they have been deleted and re-created.
//The ID is generated by the server and should not be set directly by the client.
// The ID is generated by the server and should not be set directly by the client.
ID string `json:"ID"`

// Name is the logical name of the job used to refer to it.
Expand Down Expand Up @@ -114,6 +116,11 @@ type Job struct {
ModifyTime int64 `json:"ModifyTime"`
}

func (j *Job) MetricAttributes() []attribute.KeyValue {
// TODO(forrest): will need to re-think how we tag metrics from jobs with more than one task when ever that happens.
return append(j.Task().MetricAttributes(), attribute.String("job_type", j.Type))
}

func (j *Job) String() string {
return j.ID
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/models/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"errors"
"fmt"

"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
"github.com/hashicorp/go-multierror"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/exp/maps"

"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
)

type Task struct {
Expand Down Expand Up @@ -38,6 +40,14 @@ type Task struct {
Timeouts *TimeoutConfig `json:"Timeouts,omitempty"`
}

func (t *Task) MetricAttributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("task_engine", t.Engine.Type),
attribute.String("task_publisher", t.Publisher.Type),
attribute.String("task_network", t.Network.Type.String()),
}
}

func (t *Task) Normalize() {
// Ensure that an empty and nil map are treated the same
if t.Meta == nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/node/metrics/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package metrics

import (
"github.com/samber/lo"
"go.opentelemetry.io/otel"

"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

var (
nodeMeter = otel.GetMeterProvider().Meter("bacalhau-node")
)

var (
NodeInfo = lo.Must(telemetry.NewCounter(
nodeMeter,
"bacalhau_node_info",
"A static metric with labels describing the bacalhau node",
))
)
20 changes: 16 additions & 4 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"fmt"
"time"

"github.com/hashicorp/go-multierror"
"github.com/imdario/mergo"
"github.com/labstack/echo/v4"
"github.com/libp2p/go-libp2p/core/host"
"go.opentelemetry.io/otel/attribute"

"github.com/bacalhau-project/bacalhau/pkg/authz"
pkgconfig "github.com/bacalhau-project/bacalhau/pkg/config"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
Expand All @@ -14,6 +20,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/model"
"github.com/bacalhau-project/bacalhau/pkg/models"
nats_transport "github.com/bacalhau-project/bacalhau/pkg/nats/transport"
"github.com/bacalhau-project/bacalhau/pkg/node/metrics"
"github.com/bacalhau-project/bacalhau/pkg/publicapi"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/endpoint/agent"
Expand All @@ -24,10 +31,6 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/transport"
"github.com/bacalhau-project/bacalhau/pkg/version"
"github.com/hashicorp/go-multierror"
"github.com/imdario/mergo"
"github.com/labstack/echo/v4"
"github.com/libp2p/go-libp2p/core/host"
)

type FeatureConfig struct {
Expand Down Expand Up @@ -365,6 +368,15 @@ func NewNode(
return errors.ErrorOrNil()
})

metrics.NodeInfo.Add(ctx, 1,
attribute.String("node_id", config.NodeID),
attribute.String("node_network_transport", config.NetworkConfig.Type),
attribute.Bool("node_is_compute", config.IsComputeNode),
attribute.Bool("node_is_requester", config.IsRequesterNode),
attribute.StringSlice("node_engines", executors.Keys(ctx)),
attribute.StringSlice("node_publishers", publishers.Keys(ctx)),
attribute.StringSlice("node_storages", storageProviders.Keys(ctx)),
)
node := &Node{
ID: config.NodeID,
CleanupManager: config.CleanupManager,
Expand Down
9 changes: 5 additions & 4 deletions pkg/requester/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@ import (
"fmt"
"time"

"github.com/bacalhau-project/bacalhau/pkg/models/migration/legacy"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"

"github.com/bacalhau-project/bacalhau/pkg/compute"
"github.com/bacalhau-project/bacalhau/pkg/jobstore"
"github.com/bacalhau-project/bacalhau/pkg/model"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/migration/legacy"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"
"github.com/bacalhau-project/bacalhau/pkg/requester/jobtransform"
"github.com/bacalhau-project/bacalhau/pkg/storage"
"github.com/bacalhau-project/bacalhau/pkg/system"
Expand Down Expand Up @@ -128,6 +127,8 @@ func (e *BaseEndpoint) SubmitJob(ctx context.Context, data model.JobCreatePayloa
}
}

JobsSubmitted.Inc(ctx, job.MetricAttributes()...)

err = e.store.CreateJob(ctx, *job)
if err != nil {
return nil, err
Expand Down
19 changes: 19 additions & 0 deletions pkg/requester/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package requester

import (
"github.com/samber/lo"
"go.opentelemetry.io/otel"

"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

var (
requesterMeter = otel.GetMeterProvider().Meter("requester")
)

var (
JobsSubmitted = lo.Must(telemetry.NewCounter(
requesterMeter,
"job_submitted",
"Number of jobs submitted"))
)

0 comments on commit 7a5f9c8

Please sign in to comment.