Skip to content

Commit

Permalink
Frrist/metrics/include instance (#4450)
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist authored Sep 24, 2024
1 parent c2440c1 commit 48dc1a7
Show file tree
Hide file tree
Showing 35 changed files with 502 additions and 267 deletions.
3 changes: 2 additions & 1 deletion cmd/cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

func NewRootCmd() *cobra.Command {
Expand Down Expand Up @@ -153,6 +154,6 @@ func injectRootSpan(cmd *cobra.Command, ctx context.Context) context.Context {
names = append([]string{root.Name()}, names...)
}
name := fmt.Sprintf("bacalhau.%s", strings.Join(names, "."))
ctx, span := system.NewRootSpan(ctx, system.GetTracer(), name)
ctx, span := telemetry.NewRootSpan(ctx, telemetry.GetTracer(), name)
return context.WithValue(ctx, spanKey, span)
}
52 changes: 27 additions & 25 deletions cmd/cli/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/node"
"github.com/bacalhau-project/bacalhau/pkg/repo"
"github.com/bacalhau-project/bacalhau/pkg/setup"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/util/closer"
"github.com/bacalhau-project/bacalhau/pkg/util/templates"
"github.com/bacalhau-project/bacalhau/pkg/version"
Expand Down Expand Up @@ -123,36 +124,39 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error {
ctx := cmd.Context()
cm := util.GetCleanupManager(ctx)

// Attempt to read the node name from the repo
nodeName, err := fsRepo.ReadNodeName()
sysmeta, err := fsRepo.SystemMetadata()
if err != nil {
return fmt.Errorf("failed to get node name: %w", err)
return fmt.Errorf("failed to get system metadata from repo: %w", err)
}

if nodeName == "" {
if sysmeta.NodeName == "" {
// Check if a flag was provided
nodeName = cmd.PersistentFlags().Lookup(NameFlagName).Value.String()
if nodeName == "" {
sysmeta.NodeName = cmd.PersistentFlags().Lookup(NameFlagName).Value.String()
if sysmeta.NodeName == "" {
// No flag provided, generate and persist node name
nodeName, err = config.GenerateNodeID(ctx, cfg.NameProvider)
sysmeta.NodeName, err = config.GenerateNodeID(ctx, cfg.NameProvider)
if err != nil {
return fmt.Errorf("failed to generate node name for provider %s: %w", cfg.NameProvider, err)
}
}
// Persist the node name
if err := fsRepo.WriteNodeName(nodeName); err != nil {
return fmt.Errorf("failed to write node name %s: %w", nodeName, err)
if err := fsRepo.WriteNodeName(sysmeta.NodeName); err != nil {
return fmt.Errorf("failed to write node name %s: %w", sysmeta.NodeName, err)
}
log.Info().Msgf("persisted node name %s", sysmeta.NodeName)
// now reload the system metadata since it has changed.
sysmeta, err = fsRepo.SystemMetadata()
if err != nil {
return fmt.Errorf("reloading system metadata after persisting name: %w", err)
}
log.Info().Msgf("persisted node name %s", nodeName)

} else {
// Warn if the flag was provided but node name already exists
if flagNodeName := cmd.PersistentFlags().Lookup(NameFlagName).Value.String(); flagNodeName != "" && flagNodeName != nodeName {
log.Warn().Msgf("--name flag with value %s ignored. Name %s already exists", flagNodeName, nodeName)
if flagNodeName := cmd.PersistentFlags().Lookup(NameFlagName).Value.String(); flagNodeName != "" && flagNodeName != sysmeta.NodeName {
log.Warn().Msgf("--name flag with value %s ignored. Name %s already exists", flagNodeName, sysmeta.NodeName)
}
}

ctx = logger.ContextWithNodeIDLogger(ctx, nodeName)
ctx = logger.ContextWithNodeIDLogger(ctx, sysmeta.NodeName)

// configure node type
isRequesterNode := cfg.Orchestrator.Enabled
Expand Down Expand Up @@ -184,7 +188,7 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error {
return err
}
nodeConfig := node.NodeConfig{
NodeID: nodeName,
NodeID: sysmeta.NodeName,
CleanupManager: cm,
DisabledFeatures: node.FeatureConfig{
Engines: cfg.Engines.Disabled,
Expand Down Expand Up @@ -250,16 +254,14 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error {
}

if !cfg.DisableAnalytics {
installationID, err := fsRepo.ReadInstallationID()
if err != nil {
log.Trace().Err(err).Msg("failed to read installationID")
}
if err := analytics.SetupAnalyticsProvider(ctx,
analytics.WithNodeNodeID(nodeName),
err = analytics.SetupAnalyticsProvider(ctx,
analytics.WithNodeNodeID(sysmeta.NodeName),
analytics.WithInstallationID(system.InstallationID()),
analytics.WithInstanceID(sysmeta.InstanceID),
analytics.WithNodeType(isRequesterNode, isComputeNode),
analytics.WithInstallationID(installationID),
analytics.WithVersion(version.Get()),
); err != nil {
analytics.WithVersion(version.Get()))

if err != nil {
log.Trace().Err(err).Msg("failed to setup analytics provider")
}
defer func() {
Expand All @@ -270,7 +272,7 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error {
}

startupLog := log.Info().
Str("name", nodeName).
Str("name", sysmeta.NodeName).
Str("address", fmt.Sprintf("%s:%d", hostAddress, cfg.API.Port)).
Bool("compute_enabled", cfg.Compute.Enabled).
Bool("orchestrator_enabled", cfg.Orchestrator.Enabled).
Expand Down
15 changes: 15 additions & 0 deletions cmd/util/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
clientv2 "github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2"
"github.com/bacalhau-project/bacalhau/pkg/repo"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/version"
)

Expand Down Expand Up @@ -43,6 +45,19 @@ func GetAPIClientV2(cmd *cobra.Command, cfg types.Bacalhau) (clientv2.API, error
apimodels.HTTPHeaderBacalhauArch: {bv.GOARCH},
}

sysmeta, err := repo.LoadSystemMetadata(cfg.DataDir)
if err == nil {
if sysmeta.InstanceID != "" {
headers[apimodels.HTTPHeaderBacalhauInstanceID] = []string{sysmeta.InstanceID}
}
} else {
log.Debug().Err(err).Msg("failed to load system metadata from repo path")
}

if installationID := system.InstallationID(); installationID != "" {
headers[apimodels.HTTPHeaderBacalhauInstallationID] = []string{installationID}
}

opts := []clientv2.OptionFn{
clientv2.WithCACertificate(tlsCfg.CAFile),
clientv2.WithInsecureTLS(tlsCfg.Insecure),
Expand Down
10 changes: 0 additions & 10 deletions cmd/util/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,11 @@ func GetAllVersions(ctx context.Context, cfg types.Bacalhau, api clientv2.API, r
return versions, fmt.Errorf("loading user key: %w", err)
}

installationID, err := r.ReadInstallationID()
if err != nil {
return versions, fmt.Errorf("reading installationID: %w", err)
}

if installationID == "" {
return versions, errors.Wrap(err, "Installation ID not set")
}

updateCheck, err := version.CheckForUpdate(
ctx,
versions.ClientVersion,
versions.ServerVersion,
userKey.ClientID(),
installationID,
)
if err != nil {
return versions, errors.Wrap(err, "failed to get latest version")
Expand Down
8 changes: 6 additions & 2 deletions pkg/analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,17 @@ func WithNodeType(isRequester, isCompute bool) Option {

func WithInstallationID(id string) Option {
return func(c *Config) {
c.attributes = append(c.attributes, attribute.String(NodeInstallationIDKey, id))
if id != "" {
c.attributes = append(c.attributes, attribute.String(NodeInstallationIDKey, id))
}
}
}

func WithInstanceID(id string) Option {
return func(c *Config) {
c.attributes = append(c.attributes, attribute.String(NodeInstanceIDKey, id))
if id != "" {
c.attributes = append(c.attributes, attribute.String(NodeInstanceIDKey, id))
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/compute/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/executor"
"github.com/bacalhau-project/bacalhau/pkg/lib/concurrency"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"

"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
"github.com/bacalhau-project/bacalhau/pkg/compute/logstream"
"github.com/bacalhau-project/bacalhau/pkg/compute/store"
"github.com/bacalhau-project/bacalhau/pkg/system"
)

type BaseEndpointParams struct {
Expand Down Expand Up @@ -52,7 +52,7 @@ func (s BaseEndpoint) GetNodeID() string {
}

func (s BaseEndpoint) AskForBid(ctx context.Context, request AskForBidRequest) (AskForBidResponse, error) {
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/compute.BaseEndpoint.AskForBid", trace.WithSpanKind(trace.SpanKindInternal))
ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/compute.BaseEndpoint.AskForBid", trace.WithSpanKind(trace.SpanKindInternal))
defer span.End()
log.Ctx(ctx).Debug().Msgf("asked to bid on: %+v", request)
jobsReceived.Add(ctx, 1)
Expand Down
14 changes: 7 additions & 7 deletions pkg/compute/executor_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/lib/collections"
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

type bufferTask struct {
Expand Down Expand Up @@ -116,9 +116,9 @@ func (s *ExecutorBuffer) Run(ctx context.Context, localExecutionState store.Loca
// doRun triggers the execution by the delegate backend.Executor and frees up the capacity when the execution is done.
func (s *ExecutorBuffer) doRun(ctx context.Context, task *bufferTask) {
job := task.localExecutionState.Execution.Job
ctx = system.AddJobIDToBaggage(ctx, job.ID)
ctx = system.AddNodeIDToBaggage(ctx, s.ID)
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/compute.ExecutorBuffer.Run")
ctx = telemetry.AddJobIDToBaggage(ctx, job.ID)
ctx = telemetry.AddNodeIDToBaggage(ctx, s.ID)
ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/compute.ExecutorBuffer.Run")
defer span.End()

innerCtx := ctx
Expand Down Expand Up @@ -200,9 +200,9 @@ func (s *ExecutorBuffer) Cancel(_ context.Context, localExecutionState store.Loc
execution := localExecutionState.Execution
go func() {
ctx := logger.ContextWithNodeIDLogger(context.Background(), s.ID)
ctx = system.AddJobIDToBaggage(ctx, execution.Job.ID)
ctx = system.AddNodeIDToBaggage(ctx, s.ID)
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/compute.ExecutorBuffer.Cancel")
ctx = telemetry.AddJobIDToBaggage(ctx, execution.Job.ID)
ctx = telemetry.AddNodeIDToBaggage(ctx, s.ID)
ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/compute.ExecutorBuffer.Cancel")
defer span.End()

err := s.delegateService.Cancel(ctx, localExecutionState)
Expand Down
5 changes: 2 additions & 3 deletions pkg/docker/tracing/traced.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"

"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

Expand Down Expand Up @@ -203,9 +202,9 @@ func (c TracedClient) Close() error {
}

func (c TracedClient) span(ctx context.Context, name string) (context.Context, trace.Span) {
return system.NewSpan(
return telemetry.NewSpan(
ctx,
system.GetTracer(),
telemetry.GetTracer(),
fmt.Sprintf("docker.%s", name),
trace.WithAttributes(semconv.HostName(c.hostname), semconv.PeerService("docker")),
trace.WithSpanKind(trace.SpanKindClient),
Expand Down
3 changes: 1 addition & 2 deletions pkg/eventhandler/context_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
oteltrace "go.opentelemetry.io/otel/trace"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

Expand Down Expand Up @@ -37,7 +36,7 @@ func (t *TracerContextProvider) GetContext(ctx context.Context, jobID string) co
t.contextMutex.Lock()
defer t.contextMutex.Unlock()

jobCtx, _ := system.Span(ctx, "pkg/eventhandler/JobEventHandler.HandleJobEvent",
jobCtx, _ := telemetry.Span(ctx, "pkg/eventhandler/JobEventHandler.HandleJobEvent",
oteltrace.WithSpanKind(oteltrace.SpanKindInternal),
oteltrace.WithAttributes(
attribute.String(telemetry.TracerAttributeNameNodeID, t.nodeID),
Expand Down
7 changes: 3 additions & 4 deletions pkg/executor/wasm/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import (
"github.com/tetratelabs/wazero"
"go.uber.org/atomic"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/system"

"github.com/bacalhau-project/bacalhau/pkg/lib/math"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"

"github.com/bacalhau-project/bacalhau/pkg/bidstrategy"
"github.com/bacalhau-project/bacalhau/pkg/executor"
Expand Down Expand Up @@ -66,7 +65,7 @@ const WasmMaxPagesLimit = 1 << (WasmArch / 2)

// Start initiates an execution based on the provided RunCommandRequest.
func (e *Executor) Start(ctx context.Context, request *executor.RunCommandRequest) error {
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.Executor.Start")
ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.Executor.Start")
defer span.End()

if handler, found := e.handlers.Get(request.ExecutionID); found {
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/wasm/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"go.ptx.dk/multierrgroup"

"github.com/bacalhau-project/bacalhau/pkg/storage"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

// ModuleLoader handles the loading of WebAssembly modules from
Expand Down Expand Up @@ -46,7 +46,7 @@ func NewModuleLoader(runtime wazero.Runtime, config wazero.ModuleConfig, storage

// Load compiles and returns a module located at the passed path.
func (loader *ModuleLoader) Load(ctx context.Context, path string) (wazero.CompiledModule, error) {
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.ModuleLoader.Load")
ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.ModuleLoader.Load")
span.SetAttributes(attribute.String("Path", path))
defer span.End()

Expand All @@ -66,7 +66,7 @@ func (loader *ModuleLoader) Load(ctx context.Context, path string) (wazero.Compi

// loadModule loads and compiles all of the modules located by the passed storage specs.
func (loader *ModuleLoader) loadModule(ctx context.Context, m storage.PreparedStorage) (wazero.CompiledModule, error) {
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.ModuleLoader.loadModule")
ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.ModuleLoader.loadModule")
defer span.End()

programPath := m.Volume.Source
Expand Down Expand Up @@ -106,7 +106,7 @@ func (loader *ModuleLoader) loadModule(ctx context.Context, m storage.PreparedSt
// loaded modules, so that the returned module has all of its dependencies fully
// instantiated and is ready to use.
func (loader *ModuleLoader) InstantiateRemoteModule(ctx context.Context, m storage.PreparedStorage) (api.Module, error) {
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.ModuleLoader.InstantiateRemoteModule")
ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.ModuleLoader.InstantiateRemoteModule")
span.SetAttributes(attribute.String("ModuleName", m.InputSource.Alias))
defer span.End()

Expand Down Expand Up @@ -150,7 +150,7 @@ const unknownModuleErrStr = ("could not find WASM module with name %q. " +
"see also: https://docs.bacalhau.org/getting-started/wasm-workload-onboarding")

func (loader *ModuleLoader) loadModuleByName(ctx context.Context, moduleName string) (api.Module, error) {
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.ModuleLoader.loadModuleByName")
ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.ModuleLoader.loadModuleByName")
span.SetAttributes(attribute.String("ModuleName", moduleName))
defer span.End()

Expand Down
Loading

0 comments on commit 48dc1a7

Please sign in to comment.