From 48dc1a709d6fc6456fc49e7bcd5ed0a2fb919e76 Mon Sep 17 00:00:00 2001 From: Forrest <6546409+frrist@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:48:22 -0700 Subject: [PATCH] Frrist/metrics/include instance (#4450) --- cmd/cli/root.go | 3 +- cmd/cli/serve/serve.go | 52 +++---- cmd/util/api.go | 15 ++ cmd/util/version.go | 10 -- pkg/analytics/analytics.go | 8 +- pkg/compute/endpoint.go | 4 +- pkg/compute/executor_buffer.go | 14 +- pkg/docker/tracing/traced.go | 5 +- pkg/eventhandler/context_provider.go | 3 +- pkg/executor/wasm/executor.go | 7 +- pkg/executor/wasm/loader.go | 10 +- pkg/executor/wasm/trace.go | 17 +-- pkg/models/constants.go | 5 + pkg/nats/pubsub/pubsub.go | 4 +- pkg/node/node.go | 1 + pkg/node/requester.go | 13 +- pkg/orchestrator/endpoint.go | 7 + pkg/orchestrator/transformer/job.go | 20 +++ pkg/orchestrator/types.go | 7 +- pkg/publicapi/apimodels/constants.go | 3 + pkg/publicapi/endpoint/orchestrator/job.go | 6 +- pkg/publisher/tracing/tracing.go | 5 +- pkg/pubsub/buffering_pubsub.go | 8 +- pkg/repo/fs.go | 7 +- pkg/repo/migrations/helpers.go | 17 +++ pkg/repo/migrations/v3_4.go | 162 ++++++++++---------- pkg/repo/migrations/v3_4_test.go | 163 ++++++++++++++++----- pkg/repo/sysmeta.go | 53 ++++--- pkg/routing/tracing/tracing.go | 11 +- pkg/storage/tracing/tracing.go | 13 +- pkg/system/config.go | 68 +++++++++ pkg/{system => telemetry}/tracer.go | 20 +-- pkg/{system => telemetry}/tracer_test.go | 5 +- pkg/util/targzip/targzip.go | 7 +- pkg/version/update.go | 16 +- 35 files changed, 502 insertions(+), 267 deletions(-) create mode 100644 pkg/system/config.go rename pkg/{system => telemetry}/tracer.go (85%) rename pkg/{system => telemetry}/tracer_test.go (91%) diff --git a/cmd/cli/root.go b/cmd/cli/root.go index 0786dd8763..9677911d71 100644 --- a/cmd/cli/root.go +++ b/cmd/cli/root.go @@ -27,6 +27,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/config/types" "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/system" + "github.com/bacalhau-project/bacalhau/pkg/telemetry" ) func NewRootCmd() *cobra.Command { @@ -153,6 +154,6 @@ func injectRootSpan(cmd *cobra.Command, ctx context.Context) context.Context { names = append([]string{root.Name()}, names...) } name := fmt.Sprintf("bacalhau.%s", strings.Join(names, ".")) - ctx, span := system.NewRootSpan(ctx, system.GetTracer(), name) + ctx, span := telemetry.NewRootSpan(ctx, telemetry.GetTracer(), name) return context.WithValue(ctx, spanKey, span) } diff --git a/cmd/cli/serve/serve.go b/cmd/cli/serve/serve.go index 19e226fba0..529e032024 100644 --- a/cmd/cli/serve/serve.go +++ b/cmd/cli/serve/serve.go @@ -24,6 +24,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/node" "github.com/bacalhau-project/bacalhau/pkg/repo" "github.com/bacalhau-project/bacalhau/pkg/setup" + "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/util/closer" "github.com/bacalhau-project/bacalhau/pkg/util/templates" "github.com/bacalhau-project/bacalhau/pkg/version" @@ -123,36 +124,39 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error { ctx := cmd.Context() cm := util.GetCleanupManager(ctx) - // Attempt to read the node name from the repo - nodeName, err := fsRepo.ReadNodeName() + sysmeta, err := fsRepo.SystemMetadata() if err != nil { - return fmt.Errorf("failed to get node name: %w", err) + return fmt.Errorf("failed to get system metadata from repo: %w", err) } - - if nodeName == "" { + if sysmeta.NodeName == "" { // Check if a flag was provided - nodeName = cmd.PersistentFlags().Lookup(NameFlagName).Value.String() - if nodeName == "" { + sysmeta.NodeName = cmd.PersistentFlags().Lookup(NameFlagName).Value.String() + if sysmeta.NodeName == "" { // No flag provided, generate and persist node name - nodeName, err = config.GenerateNodeID(ctx, cfg.NameProvider) + sysmeta.NodeName, err = config.GenerateNodeID(ctx, cfg.NameProvider) if err != nil { return fmt.Errorf("failed to generate node name for provider %s: %w", cfg.NameProvider, err) } } // Persist the node name - if err := fsRepo.WriteNodeName(nodeName); err != nil { - return fmt.Errorf("failed to write node name %s: %w", nodeName, err) + if err := fsRepo.WriteNodeName(sysmeta.NodeName); err != nil { + return fmt.Errorf("failed to write node name %s: %w", sysmeta.NodeName, err) + } + log.Info().Msgf("persisted node name %s", sysmeta.NodeName) + // now reload the system metadata since it has changed. + sysmeta, err = fsRepo.SystemMetadata() + if err != nil { + return fmt.Errorf("reloading system metadata after persisting name: %w", err) } - log.Info().Msgf("persisted node name %s", nodeName) } else { // Warn if the flag was provided but node name already exists - if flagNodeName := cmd.PersistentFlags().Lookup(NameFlagName).Value.String(); flagNodeName != "" && flagNodeName != nodeName { - log.Warn().Msgf("--name flag with value %s ignored. Name %s already exists", flagNodeName, nodeName) + if flagNodeName := cmd.PersistentFlags().Lookup(NameFlagName).Value.String(); flagNodeName != "" && flagNodeName != sysmeta.NodeName { + log.Warn().Msgf("--name flag with value %s ignored. Name %s already exists", flagNodeName, sysmeta.NodeName) } } - ctx = logger.ContextWithNodeIDLogger(ctx, nodeName) + ctx = logger.ContextWithNodeIDLogger(ctx, sysmeta.NodeName) // configure node type isRequesterNode := cfg.Orchestrator.Enabled @@ -184,7 +188,7 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error { return err } nodeConfig := node.NodeConfig{ - NodeID: nodeName, + NodeID: sysmeta.NodeName, CleanupManager: cm, DisabledFeatures: node.FeatureConfig{ Engines: cfg.Engines.Disabled, @@ -250,16 +254,14 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error { } if !cfg.DisableAnalytics { - installationID, err := fsRepo.ReadInstallationID() - if err != nil { - log.Trace().Err(err).Msg("failed to read installationID") - } - if err := analytics.SetupAnalyticsProvider(ctx, - analytics.WithNodeNodeID(nodeName), + err = analytics.SetupAnalyticsProvider(ctx, + analytics.WithNodeNodeID(sysmeta.NodeName), + analytics.WithInstallationID(system.InstallationID()), + analytics.WithInstanceID(sysmeta.InstanceID), analytics.WithNodeType(isRequesterNode, isComputeNode), - analytics.WithInstallationID(installationID), - analytics.WithVersion(version.Get()), - ); err != nil { + analytics.WithVersion(version.Get())) + + if err != nil { log.Trace().Err(err).Msg("failed to setup analytics provider") } defer func() { @@ -270,7 +272,7 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error { } startupLog := log.Info(). - Str("name", nodeName). + Str("name", sysmeta.NodeName). Str("address", fmt.Sprintf("%s:%d", hostAddress, cfg.API.Port)). Bool("compute_enabled", cfg.Compute.Enabled). Bool("orchestrator_enabled", cfg.Orchestrator.Enabled). diff --git a/cmd/util/api.go b/cmd/util/api.go index 9c6aed43df..613810fd0e 100644 --- a/cmd/util/api.go +++ b/cmd/util/api.go @@ -12,6 +12,8 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/config/types" "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" clientv2 "github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2" + "github.com/bacalhau-project/bacalhau/pkg/repo" + "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/version" ) @@ -43,6 +45,19 @@ func GetAPIClientV2(cmd *cobra.Command, cfg types.Bacalhau) (clientv2.API, error apimodels.HTTPHeaderBacalhauArch: {bv.GOARCH}, } + sysmeta, err := repo.LoadSystemMetadata(cfg.DataDir) + if err == nil { + if sysmeta.InstanceID != "" { + headers[apimodels.HTTPHeaderBacalhauInstanceID] = []string{sysmeta.InstanceID} + } + } else { + log.Debug().Err(err).Msg("failed to load system metadata from repo path") + } + + if installationID := system.InstallationID(); installationID != "" { + headers[apimodels.HTTPHeaderBacalhauInstallationID] = []string{installationID} + } + opts := []clientv2.OptionFn{ clientv2.WithCACertificate(tlsCfg.CAFile), clientv2.WithInsecureTLS(tlsCfg.Insecure), diff --git a/cmd/util/version.go b/cmd/util/version.go index 8a54eaa6fd..170048756d 100644 --- a/cmd/util/version.go +++ b/cmd/util/version.go @@ -50,21 +50,11 @@ func GetAllVersions(ctx context.Context, cfg types.Bacalhau, api clientv2.API, r return versions, fmt.Errorf("loading user key: %w", err) } - installationID, err := r.ReadInstallationID() - if err != nil { - return versions, fmt.Errorf("reading installationID: %w", err) - } - - if installationID == "" { - return versions, errors.Wrap(err, "Installation ID not set") - } - updateCheck, err := version.CheckForUpdate( ctx, versions.ClientVersion, versions.ServerVersion, userKey.ClientID(), - installationID, ) if err != nil { return versions, errors.Wrap(err, "failed to get latest version") diff --git a/pkg/analytics/analytics.go b/pkg/analytics/analytics.go index 547e5633f4..8143bfe654 100644 --- a/pkg/analytics/analytics.go +++ b/pkg/analytics/analytics.go @@ -63,13 +63,17 @@ func WithNodeType(isRequester, isCompute bool) Option { func WithInstallationID(id string) Option { return func(c *Config) { - c.attributes = append(c.attributes, attribute.String(NodeInstallationIDKey, id)) + if id != "" { + c.attributes = append(c.attributes, attribute.String(NodeInstallationIDKey, id)) + } } } func WithInstanceID(id string) Option { return func(c *Config) { - c.attributes = append(c.attributes, attribute.String(NodeInstanceIDKey, id)) + if id != "" { + c.attributes = append(c.attributes, attribute.String(NodeInstanceIDKey, id)) + } } } diff --git a/pkg/compute/endpoint.go b/pkg/compute/endpoint.go index b1e62e864a..a5385ccf3a 100644 --- a/pkg/compute/endpoint.go +++ b/pkg/compute/endpoint.go @@ -10,11 +10,11 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/executor" "github.com/bacalhau-project/bacalhau/pkg/lib/concurrency" "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/telemetry" "github.com/bacalhau-project/bacalhau/pkg/compute/capacity" "github.com/bacalhau-project/bacalhau/pkg/compute/logstream" "github.com/bacalhau-project/bacalhau/pkg/compute/store" - "github.com/bacalhau-project/bacalhau/pkg/system" ) type BaseEndpointParams struct { @@ -52,7 +52,7 @@ func (s BaseEndpoint) GetNodeID() string { } func (s BaseEndpoint) AskForBid(ctx context.Context, request AskForBidRequest) (AskForBidResponse, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/compute.BaseEndpoint.AskForBid", trace.WithSpanKind(trace.SpanKindInternal)) + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/compute.BaseEndpoint.AskForBid", trace.WithSpanKind(trace.SpanKindInternal)) defer span.End() log.Ctx(ctx).Debug().Msgf("asked to bid on: %+v", request) jobsReceived.Add(ctx, 1) diff --git a/pkg/compute/executor_buffer.go b/pkg/compute/executor_buffer.go index 68bc2dd6fe..5fe2868a7f 100644 --- a/pkg/compute/executor_buffer.go +++ b/pkg/compute/executor_buffer.go @@ -10,7 +10,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/lib/collections" "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/system" + "github.com/bacalhau-project/bacalhau/pkg/telemetry" ) type bufferTask struct { @@ -116,9 +116,9 @@ func (s *ExecutorBuffer) Run(ctx context.Context, localExecutionState store.Loca // doRun triggers the execution by the delegate backend.Executor and frees up the capacity when the execution is done. func (s *ExecutorBuffer) doRun(ctx context.Context, task *bufferTask) { job := task.localExecutionState.Execution.Job - ctx = system.AddJobIDToBaggage(ctx, job.ID) - ctx = system.AddNodeIDToBaggage(ctx, s.ID) - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/compute.ExecutorBuffer.Run") + ctx = telemetry.AddJobIDToBaggage(ctx, job.ID) + ctx = telemetry.AddNodeIDToBaggage(ctx, s.ID) + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/compute.ExecutorBuffer.Run") defer span.End() innerCtx := ctx @@ -200,9 +200,9 @@ func (s *ExecutorBuffer) Cancel(_ context.Context, localExecutionState store.Loc execution := localExecutionState.Execution go func() { ctx := logger.ContextWithNodeIDLogger(context.Background(), s.ID) - ctx = system.AddJobIDToBaggage(ctx, execution.Job.ID) - ctx = system.AddNodeIDToBaggage(ctx, s.ID) - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/compute.ExecutorBuffer.Cancel") + ctx = telemetry.AddJobIDToBaggage(ctx, execution.Job.ID) + ctx = telemetry.AddNodeIDToBaggage(ctx, s.ID) + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/compute.ExecutorBuffer.Cancel") defer span.End() err := s.delegateService.Cancel(ctx, localExecutionState) diff --git a/pkg/docker/tracing/traced.go b/pkg/docker/tracing/traced.go index d19aaca101..8375ed7894 100644 --- a/pkg/docker/tracing/traced.go +++ b/pkg/docker/tracing/traced.go @@ -17,7 +17,6 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "go.opentelemetry.io/otel/trace" - "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/telemetry" ) @@ -203,9 +202,9 @@ func (c TracedClient) Close() error { } func (c TracedClient) span(ctx context.Context, name string) (context.Context, trace.Span) { - return system.NewSpan( + return telemetry.NewSpan( ctx, - system.GetTracer(), + telemetry.GetTracer(), fmt.Sprintf("docker.%s", name), trace.WithAttributes(semconv.HostName(c.hostname), semconv.PeerService("docker")), trace.WithSpanKind(trace.SpanKindClient), diff --git a/pkg/eventhandler/context_provider.go b/pkg/eventhandler/context_provider.go index fdfc8ba293..774878edcd 100644 --- a/pkg/eventhandler/context_provider.go +++ b/pkg/eventhandler/context_provider.go @@ -8,7 +8,6 @@ import ( oteltrace "go.opentelemetry.io/otel/trace" "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/telemetry" ) @@ -37,7 +36,7 @@ func (t *TracerContextProvider) GetContext(ctx context.Context, jobID string) co t.contextMutex.Lock() defer t.contextMutex.Unlock() - jobCtx, _ := system.Span(ctx, "pkg/eventhandler/JobEventHandler.HandleJobEvent", + jobCtx, _ := telemetry.Span(ctx, "pkg/eventhandler/JobEventHandler.HandleJobEvent", oteltrace.WithSpanKind(oteltrace.SpanKindInternal), oteltrace.WithAttributes( attribute.String(telemetry.TracerAttributeNameNodeID, t.nodeID), diff --git a/pkg/executor/wasm/executor.go b/pkg/executor/wasm/executor.go index 4243ff8623..d53c9ad8d6 100644 --- a/pkg/executor/wasm/executor.go +++ b/pkg/executor/wasm/executor.go @@ -12,10 +12,9 @@ import ( "github.com/tetratelabs/wazero" "go.uber.org/atomic" - "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/system" - "github.com/bacalhau-project/bacalhau/pkg/lib/math" + "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/telemetry" "github.com/bacalhau-project/bacalhau/pkg/bidstrategy" "github.com/bacalhau-project/bacalhau/pkg/executor" @@ -66,7 +65,7 @@ const WasmMaxPagesLimit = 1 << (WasmArch / 2) // Start initiates an execution based on the provided RunCommandRequest. func (e *Executor) Start(ctx context.Context, request *executor.RunCommandRequest) error { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.Executor.Start") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.Executor.Start") defer span.End() if handler, found := e.handlers.Get(request.ExecutionID); found { diff --git a/pkg/executor/wasm/loader.go b/pkg/executor/wasm/loader.go index a82df396ef..9f9e205855 100644 --- a/pkg/executor/wasm/loader.go +++ b/pkg/executor/wasm/loader.go @@ -15,7 +15,7 @@ import ( "go.ptx.dk/multierrgroup" "github.com/bacalhau-project/bacalhau/pkg/storage" - "github.com/bacalhau-project/bacalhau/pkg/system" + "github.com/bacalhau-project/bacalhau/pkg/telemetry" ) // ModuleLoader handles the loading of WebAssembly modules from @@ -46,7 +46,7 @@ func NewModuleLoader(runtime wazero.Runtime, config wazero.ModuleConfig, storage // Load compiles and returns a module located at the passed path. func (loader *ModuleLoader) Load(ctx context.Context, path string) (wazero.CompiledModule, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.ModuleLoader.Load") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.ModuleLoader.Load") span.SetAttributes(attribute.String("Path", path)) defer span.End() @@ -66,7 +66,7 @@ func (loader *ModuleLoader) Load(ctx context.Context, path string) (wazero.Compi // loadModule loads and compiles all of the modules located by the passed storage specs. func (loader *ModuleLoader) loadModule(ctx context.Context, m storage.PreparedStorage) (wazero.CompiledModule, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.ModuleLoader.loadModule") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.ModuleLoader.loadModule") defer span.End() programPath := m.Volume.Source @@ -106,7 +106,7 @@ func (loader *ModuleLoader) loadModule(ctx context.Context, m storage.PreparedSt // loaded modules, so that the returned module has all of its dependencies fully // instantiated and is ready to use. func (loader *ModuleLoader) InstantiateRemoteModule(ctx context.Context, m storage.PreparedStorage) (api.Module, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.ModuleLoader.InstantiateRemoteModule") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.ModuleLoader.InstantiateRemoteModule") span.SetAttributes(attribute.String("ModuleName", m.InputSource.Alias)) defer span.End() @@ -150,7 +150,7 @@ const unknownModuleErrStr = ("could not find WASM module with name %q. " + "see also: https://docs.bacalhau.org/getting-started/wasm-workload-onboarding") func (loader *ModuleLoader) loadModuleByName(ctx context.Context, moduleName string) (api.Module, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.ModuleLoader.loadModuleByName") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.ModuleLoader.loadModuleByName") span.SetAttributes(attribute.String("ModuleName", moduleName)) defer span.End() diff --git a/pkg/executor/wasm/trace.go b/pkg/executor/wasm/trace.go index 8dfe625773..9243bfe2e7 100644 --- a/pkg/executor/wasm/trace.go +++ b/pkg/executor/wasm/trace.go @@ -3,7 +3,6 @@ package wasm import ( "context" - "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/telemetry" observe "github.com/dylibso/observe-sdk/go" @@ -59,7 +58,7 @@ func (t tracedRuntime) Instantiate(ctx context.Context, source []byte) (api.Modu return nil, err } } - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.tracedRuntime.Instantiate") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.tracedRuntime.Instantiate") defer span.End() module, err := telemetry.RecordErrorOnSpanTwo[api.Module](span)(t.Runtime.Instantiate(ctx, source)) if module != nil { @@ -78,7 +77,7 @@ func (t tracedRuntime) InstantiateWithConfig(ctx context.Context, source []byte, return nil, err } } - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.tracedRuntime.InstantiateWithConfig") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.tracedRuntime.InstantiateWithConfig") defer span.End() module, err := telemetry.RecordErrorOnSpanTwo[api.Module](span)(t.Runtime.InstantiateWithConfig(ctx, source, config)) if module != nil { @@ -97,7 +96,7 @@ func (t tracedRuntime) CompileModule(ctx context.Context, binary []byte) (wazero return nil, err } } - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.tracedRuntime.CompileModule") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.tracedRuntime.CompileModule") defer span.End() module, err := telemetry.RecordErrorOnSpanTwo[wazero.CompiledModule](span)(t.Runtime.CompileModule(ctx, binary)) if module != nil { @@ -114,7 +113,7 @@ func (t tracedRuntime) InstantiateModule( compiled wazero.CompiledModule, config wazero.ModuleConfig, ) (api.Module, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.tracedRuntime.InstantiateModule") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/executor/wasm.tracedRuntime.InstantiateModule") defer span.End() m := compiled.(tracedCompiledModule) module, err := telemetry.RecordErrorOnSpanTwo[api.Module](span)(t.Runtime.InstantiateModule(ctx, m.CompiledModule, config)) @@ -135,9 +134,9 @@ func (t tracedFunction) Call(ctx context.Context, params ...uint64) ([]uint64, e if t.traceCtx != nil { defer t.traceCtx.Finish() } - ctx, span := system.NewSpan( + ctx, span := telemetry.NewSpan( ctx, - system.GetTracer(), + telemetry.GetTracer(), "pkg/executor/wasm.tracedFunction.Call", trace.WithAttributes(semconv.CodeFunction(t.Function.Definition().Name())), ) @@ -155,9 +154,9 @@ func (t tracedFunction) CallWithStack(ctx context.Context, stack []uint64) error if t.traceCtx != nil { defer t.traceCtx.Finish() } - ctx, span := system.NewSpan( + ctx, span := telemetry.NewSpan( ctx, - system.GetTracer(), + telemetry.GetTracer(), "pkg/executor/wasm.tracedFunction.CallWithStack", trace.WithAttributes(semconv.CodeFunction(t.Function.Definition().Name())), ) diff --git a/pkg/models/constants.go b/pkg/models/constants.go index f56c952090..1e3d0631b0 100644 --- a/pkg/models/constants.go +++ b/pkg/models/constants.go @@ -92,4 +92,9 @@ const ( // it may have been translated from another job. MetaDerivedFrom = "bacalhau.org/derivedFrom" MetaTranslatedBy = "bacalhau.org/translatedBy" + + MetaServerInstallationID = "bacalhau.org/server.installation.id" + MetaServerInstanceID = "bacalhau.org/server.instance.id" + MetaClientInstallationID = "bacalhau.org/client.installation.id" + MetaClientInstanceID = "bacalhau.org/client.instance.id" ) diff --git a/pkg/nats/pubsub/pubsub.go b/pkg/nats/pubsub/pubsub.go index 2ef7952072..1126158209 100644 --- a/pkg/nats/pubsub/pubsub.go +++ b/pkg/nats/pubsub/pubsub.go @@ -11,7 +11,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" "github.com/bacalhau-project/bacalhau/pkg/pubsub" - "github.com/bacalhau-project/bacalhau/pkg/system" + "github.com/bacalhau-project/bacalhau/pkg/telemetry" ) type PubSubParams struct { @@ -48,7 +48,7 @@ func NewPubSub[T any](params PubSubParams) (*PubSub[T], error) { } func (p *PubSub[T]) Publish(ctx context.Context, message T) error { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/pubsub/nats.publish") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/pubsub/nats.publish") defer span.End() payload, err := marshaller.JSONMarshalWithMax(message) diff --git a/pkg/node/node.go b/pkg/node/node.go index 1742109427..19750e7395 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -180,6 +180,7 @@ func NewNode( transportLayer, transportLayer.ComputeProxy(), messageSerDeRegistry, + fsr, ) if err != nil { return nil, err diff --git a/pkg/node/requester.go b/pkg/node/requester.go index 06703dc705..c565957482 100644 --- a/pkg/node/requester.go +++ b/pkg/node/requester.go @@ -27,10 +27,12 @@ import ( auth_endpoint "github.com/bacalhau-project/bacalhau/pkg/publicapi/endpoint/auth" orchestrator_endpoint "github.com/bacalhau-project/bacalhau/pkg/publicapi/endpoint/orchestrator" requester_endpoint "github.com/bacalhau-project/bacalhau/pkg/publicapi/endpoint/requester" + "github.com/bacalhau-project/bacalhau/pkg/repo" "github.com/bacalhau-project/bacalhau/pkg/routing" "github.com/bacalhau-project/bacalhau/pkg/routing/kvstore" "github.com/bacalhau-project/bacalhau/pkg/routing/tracing" s3helper "github.com/bacalhau-project/bacalhau/pkg/s3" + "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/translation" "github.com/bacalhau-project/bacalhau/pkg/util" @@ -64,6 +66,7 @@ func NewRequesterNode( transportLayer *nats_transport.NATSTransport, computeProxy compute.Endpoint, messageSerDeRegistry *ncl.MessageSerDeRegistry, + fsr *repo.FsRepo, ) (*Requester, error) { nodeManager, heartbeatServer, err := createNodeManager(ctx, transportLayer, requesterConfig) if err != nil { @@ -209,11 +212,19 @@ func NewRequesterNode( translationProvider = translation.NewStandardTranslatorsProvider() } + installationID := system.InstallationID() + var instanceID string + if sysmeta, err := fsr.SystemMetadata(); err == nil { + instanceID = sysmeta.InstanceID + } + jobTransformers := transformer.ChainedTransformer[*models.Job]{ transformer.JobFn(transformer.IDGenerator), transformer.NameOptional(), - transformer.DefaultsApplier(requesterConfig.JobDefaults), transformer.RequesterInfo(nodeID), + transformer.OrchestratorInstallationID(installationID), + transformer.OrchestratorInstanceID(instanceID), + transformer.DefaultsApplier(requesterConfig.JobDefaults), } endpointV2 := orchestrator.NewBaseEndpoint(&orchestrator.BaseEndpointParams{ diff --git a/pkg/orchestrator/endpoint.go b/pkg/orchestrator/endpoint.go index 09d67b2298..99ba101f67 100644 --- a/pkg/orchestrator/endpoint.go +++ b/pkg/orchestrator/endpoint.go @@ -62,6 +62,13 @@ func (e *BaseEndpoint) SubmitJob(ctx context.Context, request *SubmitJobRequest) analytics.EmitEvent(ctx, analytics.NewEvent(analytics.SubmitJobEventType, submitEvent)) }() + if request.ClientInstallationID != "" { + job.Meta[models.MetaClientInstallationID] = request.ClientInstallationID + } + if request.ClientInstanceID != "" { + job.Meta[models.MetaClientInstanceID] = request.ClientInstanceID + } + if err := e.jobTransformer.Transform(ctx, job); err != nil { submitEvent.Error = err.Error() return nil, err diff --git a/pkg/orchestrator/transformer/job.go b/pkg/orchestrator/transformer/job.go index 2f1616b0f9..626dd3d276 100644 --- a/pkg/orchestrator/transformer/job.go +++ b/pkg/orchestrator/transformer/job.go @@ -115,6 +115,26 @@ func RequesterInfo(requesterNodeID string) JobTransformer { return JobFn(f) } +func OrchestratorInstanceID(instanceID string) JobTransformer { + f := func(ctx context.Context, job *models.Job) error { + if instanceID != "" { + job.Meta[models.MetaServerInstanceID] = instanceID + } + return nil + } + return JobFn(f) +} + +func OrchestratorInstallationID(installationID string) JobTransformer { + f := func(ctx context.Context, job *models.Job) error { + if installationID != "" { + job.Meta[models.MetaServerInstallationID] = installationID + } + return nil + } + return JobFn(f) +} + // NameOptional is a transformer that sets the job name to the job ID if it is empty. func NameOptional() JobTransformer { f := func(ctx context.Context, job *models.Job) error { diff --git a/pkg/orchestrator/types.go b/pkg/orchestrator/types.go index 814e6d8e1a..fcd1a4b340 100644 --- a/pkg/orchestrator/types.go +++ b/pkg/orchestrator/types.go @@ -1,12 +1,15 @@ package orchestrator import ( - "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/rs/zerolog" + + "github.com/bacalhau-project/bacalhau/pkg/models" ) type SubmitJobRequest struct { - Job *models.Job + Job *models.Job + ClientInstanceID string + ClientInstallationID string } type SubmitJobResponse struct { diff --git a/pkg/publicapi/apimodels/constants.go b/pkg/publicapi/apimodels/constants.go index 90ac11f720..5a68da6b72 100644 --- a/pkg/publicapi/apimodels/constants.go +++ b/pkg/publicapi/apimodels/constants.go @@ -24,4 +24,7 @@ const ( HTTPHeaderBacalhauBuildOS = "X-Bacalhau-Build-OS" // HTTPHeaderBacalhauArch is the header used to pass the agent architecture HTTPHeaderBacalhauArch = "X-Bacalhau-Arch" + + HTTPHeaderBacalhauInstallationID = "X-Bacalhau-InstallationID" + HTTPHeaderBacalhauInstanceID = "X-Bacalhau-InstanceID" ) diff --git a/pkg/publicapi/endpoint/orchestrator/job.go b/pkg/publicapi/endpoint/orchestrator/job.go index e6d0598d11..d73d9b62ad 100644 --- a/pkg/publicapi/endpoint/orchestrator/job.go +++ b/pkg/publicapi/endpoint/orchestrator/job.go @@ -41,8 +41,12 @@ func (e *Endpoint) putJob(c echo.Context) error { if err := c.Validate(&args); err != nil { return err } + instanceID := c.Request().Header.Get(apimodels.HTTPHeaderBacalhauInstanceID) + installationID := c.Request().Header.Get(apimodels.HTTPHeaderBacalhauInstallationID) resp, err := e.orchestrator.SubmitJob(ctx, &orchestrator.SubmitJobRequest{ - Job: args.Job, + Job: args.Job, + ClientInstallationID: installationID, + ClientInstanceID: instanceID, }) if err != nil { return err diff --git a/pkg/publisher/tracing/tracing.go b/pkg/publisher/tracing/tracing.go index a3d8441458..81baa08f98 100644 --- a/pkg/publisher/tracing/tracing.go +++ b/pkg/publisher/tracing/tracing.go @@ -9,7 +9,6 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/publisher" - "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/telemetry" "github.com/bacalhau-project/bacalhau/pkg/util/reflection" ) @@ -27,7 +26,7 @@ func Wrap(delegate publisher.Publisher) publisher.Publisher { } func (t *tracingPublisher) IsInstalled(ctx context.Context) (bool, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), fmt.Sprintf("%s.IsInstalled", t.name)) + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), fmt.Sprintf("%s.IsInstalled", t.name)) defer span.End() return t.delegate.IsInstalled(ctx) @@ -42,7 +41,7 @@ func (t *tracingPublisher) PublishResult( ) (spec models.SpecConfig, err error) { attributes := execution.Job.MetricAttributes() - ctx, span := system.NewSpan(ctx, system.GetTracer(), fmt.Sprintf("%s.PublishResult", t.name), + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), fmt.Sprintf("%s.PublishResult", t.name), trace.WithAttributes(attributes...)) defer span.End() diff --git a/pkg/pubsub/buffering_pubsub.go b/pkg/pubsub/buffering_pubsub.go index 560c7b42ac..e8eacf3d3e 100644 --- a/pkg/pubsub/buffering_pubsub.go +++ b/pkg/pubsub/buffering_pubsub.go @@ -8,8 +8,8 @@ import ( "time" "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" + "github.com/bacalhau-project/bacalhau/pkg/telemetry" - "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/rs/zerolog/log" ) @@ -58,7 +58,7 @@ func NewBufferingPubSub[T any](params BufferingPubSubParams) *BufferingPubSub[T] } func (p *BufferingPubSub[T]) Publish(ctx context.Context, message T) error { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/pubsub.BufferingPubSub.publish") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/pubsub.BufferingPubSub.publish") defer span.End() payload, err := marshaller.JSONMarshalWithMax(message) @@ -155,7 +155,7 @@ func (p *BufferingPubSub[T]) Close(ctx context.Context) (err error) { // flush the buffer to the delegate pubsub func (p *BufferingPubSub[T]) flushBuffer(ctx context.Context, envelope BufferingEnvelope, oldestMessageTime time.Time) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/pubsub.BufferingPubSub.flushBuffer") + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/pubsub.BufferingPubSub.flushBuffer") defer span.End() log.Ctx(ctx).Trace().Msgf("flushing pubsub buffer after %s with %d messages, %d bytes", @@ -175,7 +175,7 @@ func (p *BufferingPubSub[T]) antiStarvationTask() { case <-p.antiStarvationTicker.C: if p.currentBuffer.Size() > 0 && time.Since(p.oldestMessageTime) > p.maxBufferAge { func() { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/pubsub.BufferingPubSub.antiStarvationTask") //nolint:govet + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/pubsub.BufferingPubSub.antiStarvationTask") //nolint:govet defer span.End() p.flushMutex.Lock() defer p.flushMutex.Unlock() diff --git a/pkg/repo/fs.go b/pkg/repo/fs.go index 88b7401f82..9add688ab6 100644 --- a/pkg/repo/fs.go +++ b/pkg/repo/fs.go @@ -99,7 +99,7 @@ func (fsr *FsRepo) Init() error { telemetry.SetupFromEnvs() // never fail here as this isn't critical to node start up. - if err := fsr.WriteInstanceID(GenerateInstanceID()); err != nil { + if err := fsr.writeInstanceID(GenerateInstanceID()); err != nil { log.Trace().Err(err).Msgf("failed to write instanceID") } @@ -127,10 +127,11 @@ func (fsr *FsRepo) Open() error { // check if an instanceID exists persisting one if not found. // never fail here as this isn't critical to node start up. - if instanceID, err := fsr.ReadInstanceID(); err != nil { + if instanceID, err := fsr.readInstanceID(); err != nil { log.Trace().Err(err).Msgf("failed to read instanceID") } else if instanceID == "" { - if err := fsr.WriteInstanceID(GenerateInstanceID()); err != nil { + // this case will happen when a user migrated from a repo prior to instanceID existing. + if err := fsr.writeInstanceID(GenerateInstanceID()); err != nil { log.Trace().Err(err).Msgf("failed to write instanceID") } } diff --git a/pkg/repo/migrations/helpers.go b/pkg/repo/migrations/helpers.go index 22915b4265..6f9be903e0 100644 --- a/pkg/repo/migrations/helpers.go +++ b/pkg/repo/migrations/helpers.go @@ -14,6 +14,8 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/config_legacy" legacy_types "github.com/bacalhau-project/bacalhau/pkg/config_legacy/types" "github.com/bacalhau-project/bacalhau/pkg/repo" + "github.com/bacalhau-project/bacalhau/pkg/storage/util" + "github.com/bacalhau-project/bacalhau/pkg/system" ) const libp2pPrivateKey = "libp2p_private_key" @@ -141,3 +143,18 @@ func copyFile(srcPath, dstPath string) error { return nil } + +// writeInstallationID writes the installation ID to system wide config path +func writeInstallationID(cfg system.GlobalConfig, installationID string) error { + // Create config dir if it doesn't exist + if err := os.MkdirAll(cfg.ConfigDir(), util.OS_USER_RW); err != nil { + return fmt.Errorf("creating config dir: %w", err) + } + + // Write installation ID to file + installationIDFile := filepath.Join(cfg.ConfigDir(), system.InstallationIDFile) + if err := os.WriteFile(installationIDFile, []byte(installationID), util.OS_USER_RW); err != nil { + return fmt.Errorf("writing installation ID file: %w", err) + } + return nil +} diff --git a/pkg/repo/migrations/v3_4.go b/pkg/repo/migrations/v3_4.go index 31aee0e988..939499a914 100644 --- a/pkg/repo/migrations/v3_4.go +++ b/pkg/repo/migrations/v3_4.go @@ -14,6 +14,7 @@ import ( legacy_types "github.com/bacalhau-project/bacalhau/pkg/config_legacy/types" "github.com/bacalhau-project/bacalhau/pkg/repo" "github.com/bacalhau-project/bacalhau/pkg/storage/util" + "github.com/bacalhau-project/bacalhau/pkg/system" ) // V3Migration updates the repo, replacing repo.version and update.json with system_metadata.yaml. @@ -33,105 +34,106 @@ import ( // - Removes ./bacalhau/execution_store. // - If a user has configured a custom user key path, the configured value is copied to .bacalhau/user_id.pem. // - If a user has configured a custom auth tokens path, the configured value is copied to .bacalhau/tokens.json. -var V3Migration = repo.NewMigration( - repo.Version3, - repo.Version4, - func(r repo.FsRepo) error { - repoPath, err := r.Path() - if err != nil { - return err - } - _, fileCfg, err := readConfig(r) - if err != nil { - return err - } - // migrate from the repo.version file to the system_metadata.yaml file. - { - // Initialize the SystemMetadataFile in the staging directory - if err := r.WriteVersion(repo.Version4); err != nil { +var V3Migration = V3MigrationWithConfig(system.DefaultGlobalConfig) + +func V3MigrationWithConfig(globalCfg system.GlobalConfig) repo.Migration { + return repo.NewMigration( + repo.Version3, + repo.Version4, + func(r repo.FsRepo) error { + repoPath, err := r.Path() + if err != nil { return err } - if err := r.WriteLastUpdateCheck(time.UnixMilli(0)); err != nil { + _, fileCfg, err := readConfig(r) + if err != nil { return err } - if fileCfg.User.InstallationID != "" { - if err := r.WriteInstallationID(fileCfg.User.InstallationID); err != nil { + // migrate from the repo.version file to the system_metadata.yaml file. + { + // Initialize the SystemMetadataFile in the staging directory + if err := r.WriteVersion(repo.Version4); err != nil { + return err + } + // update the legacy version file so older versions fail gracefully. + if err := r.WriteLegacyVersion(repo.Version4); err != nil { + return fmt.Errorf("updating repo.verion: %w", err) + } + if err := r.WriteLastUpdateCheck(time.UnixMilli(0)); err != nil { return err } - } - - // ignore this error as the file may not exist - _ = os.Remove(filepath.Join(repoPath, "update.json")) - // update the legacy version file so older versions fail gracefully. - if err := r.WriteLegacyVersion(repo.Version4); err != nil { - return fmt.Errorf("updating repo.verion: %w", err) + // ignore this error as the file may not exist + _ = os.Remove(filepath.Join(repoPath, "update.json")) } - } - // migrate to the new repo structure - { - // if the user provided a non-standard path we will move it to the migrated repo - // if the user didn't provide a path, no copy required as the location of the file in the repo - // is unchanged. - if fileCfg.User.KeyPath != "" { - if err := copyFile(fileCfg.User.KeyPath, filepath.Join(repoPath, types.UserKeyFileName)); err != nil { - return fmt.Errorf("copying user key file: %w", err) + // migrate to the new repo structure + { + // if the user provided a non-standard path we will move it to the migrated repo + // if the user didn't provide a path, no copy required as the location of the file in the repo + // is unchanged. + if fileCfg.User.KeyPath != "" { + if err := copyFile(fileCfg.User.KeyPath, filepath.Join(repoPath, types.UserKeyFileName)); err != nil { + return fmt.Errorf("copying user key file: %w", err) + } } - } - // if the user provided a non-standard path we will move it to the migrated repo - // if the user didn't provide a path, no copy required as the location of the file in the repo - // is unchanged. - if fileCfg.Auth.TokensPath != "" { - if err := copyFile(fileCfg.Auth.TokensPath, filepath.Join(repoPath, types.AuthTokensFileName)); err != nil { - return fmt.Errorf("copying auth tokens file: %w", err) + // if the user provided a non-standard path we will move it to the migrated repo + // if the user didn't provide a path, no copy required as the location of the file in the repo + // is unchanged. + if fileCfg.Auth.TokensPath != "" { + if err := copyFile(fileCfg.Auth.TokensPath, filepath.Join(repoPath, types.AuthTokensFileName)); err != nil { + return fmt.Errorf("copying auth tokens file: %w", err) + } } - } - if err := migrateOrchestratorStore(repoPath, fileCfg.Node.Requester.JobStore); err != nil { - return err - } + if err := migrateOrchestratorStore(repoPath, fileCfg.Node.Requester.JobStore); err != nil { + return err + } - if err := migrateComputeStore(repoPath, fileCfg.Node.Compute.ExecutionStore); err != nil { - return err + if err := migrateComputeStore(repoPath, fileCfg.Node.Compute.ExecutionStore); err != nil { + return err + } } - } - // iff there is a config file in the repo, try and move it to $XDG_CONFIG_HOME/bacalhau - { - oldConfigFilePath := filepath.Join(repoPath, config_legacy.FileName) - if _, err := os.Stat(oldConfigFilePath); err == nil { - if err := r.WriteInstallationID(fileCfg.User.InstallationID); err != nil { - return fmt.Errorf("migrating installation id: %w", err) - } - if err := r.WriteNodeName(fileCfg.Node.Name); err != nil { - return fmt.Errorf("migrating node name: %w", err) + // iff there is a config file in the repo, try and move it to $XDG_CONFIG_HOME/bacalhau + { + oldConfigFilePath := filepath.Join(repoPath, config_legacy.FileName) + if _, err := os.Stat(oldConfigFilePath); err == nil { + // migrate installationID if none is present in the system wide config + if fileCfg.User.InstallationID != "" && globalCfg.InstallationID() == "" { + if err := writeInstallationID(globalCfg, fileCfg.User.InstallationID); err != nil { + return err + } + } + if err := r.WriteNodeName(fileCfg.Node.Name); err != nil { + return fmt.Errorf("migrating node name: %w", err) + } + newConfigType, err := config.MigrateV1(fileCfg) + if err != nil { + return fmt.Errorf("migrating to new config: %w", err) + } + // ensure the repo path of the config points to the repo + newConfigType.DataDir = repoPath + + // Write the updated config back to the same file + newConfigBytes, err := yaml.Marshal(&newConfigType) + if err != nil { + return fmt.Errorf("marshaling new config: %w", err) + } + if err := os.WriteFile(oldConfigFilePath, newConfigBytes, util.OS_USER_RWX); err != nil { + return fmt.Errorf("writing updated config file: %w", err) + } + } else if !os.IsNotExist(err) { + // if there was an error other than the file not existing, abort. + return fmt.Errorf("failed to read config file %s while migrating: %w", oldConfigFilePath, err) } - newConfigType, err := config.MigrateV1(fileCfg) - if err != nil { - return fmt.Errorf("migrating to new config: %w", err) - } - // ensure the repo path of the config points to the repo - newConfigType.DataDir = repoPath - // Write the updated config back to the same file - newConfigBytes, err := yaml.Marshal(&newConfigType) - if err != nil { - return fmt.Errorf("marshaling new config: %w", err) - } - if err := os.WriteFile(oldConfigFilePath, newConfigBytes, util.OS_USER_RWX); err != nil { - return fmt.Errorf("writing updated config file: %w", err) - } - } else if !os.IsNotExist(err) { - // if there was an error other than the file not existing, abort. - return fmt.Errorf("failed to read config file %s while migrating: %w", oldConfigFilePath, err) } - - } - return nil - }, -) + return nil + }, + ) +} func migrateComputeStore(repoPath string, config legacy_types.JobStoreConfig) error { oldComputeDir := filepath.Join(repoPath, "compute_store") diff --git a/pkg/repo/migrations/v3_4_test.go b/pkg/repo/migrations/v3_4_test.go index 8b56b229ad..803226fef7 100644 --- a/pkg/repo/migrations/v3_4_test.go +++ b/pkg/repo/migrations/v3_4_test.go @@ -15,17 +15,39 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/config/types" "github.com/bacalhau-project/bacalhau/pkg/config_legacy" "github.com/bacalhau-project/bacalhau/pkg/repo" + "github.com/bacalhau-project/bacalhau/pkg/system" ) +type mockGlobalConfig struct { + configDir string +} + +func (m *mockGlobalConfig) InstallationID() string { + idFile := filepath.Join(m.ConfigDir(), system.InstallationIDFile) + idBytes, err := os.ReadFile(idFile) + if err != nil { + return "" + } + return string(idBytes) +} + +func (m *mockGlobalConfig) ConfigDir() string { + return m.configDir +} + type V3MigrationsTestSuite struct { BaseMigrationTestSuite // Embed the base suite + mockConfig *mockGlobalConfig repo *repo.FsRepo } func (suite *V3MigrationsTestSuite) SetupTest() { suite.BaseMigrationTestSuite.SetupTest() + suite.mockConfig = &mockGlobalConfig{ + configDir: suite.TempDir, + } migrations, err := repo.NewMigrationManager( - V3Migration, + V3MigrationWithConfig(suite.mockConfig), ) suite.Require().NoError(err) @@ -139,20 +161,17 @@ Auth: suite.DirExists(filepath.Join(suite.TempDir, types.ComputeDirName)) suite.DirExists(filepath.Join(suite.TempDir, types.ComputeDirName, types.ExecutionDirName)) - // verify we can read the expected installationID from it. - actualInstallationID, err := suite.repo.ReadInstallationID() + // verify we can read the expected installationID + suite.Require().Equal(expectedInstallationID, suite.mockConfig.InstallationID()) + + sysmeta, err := suite.repo.SystemMetadata() suite.Require().NoError(err) - suite.Require().Equal(expectedInstallationID, actualInstallationID) // verify we can read the expected last update time from it. - actualLastUpdateCheck, err := suite.repo.ReadLastUpdateCheck() - suite.Require().NoError(err) - suite.Require().Equal(time.UnixMilli(0).UTC(), actualLastUpdateCheck.UTC()) + suite.Require().Equal(time.UnixMilli(0).UTC(), sysmeta.LastUpdateCheck.UTC()) // the node name was migrated from the old config to system_metadata.yaml - actualNodeName, err := suite.repo.ReadNodeName() - suite.Require().NoError(err) - suite.Require().Equal("n-321fd9bf-3a7c-45f5-9b6b-fb9725ac646d", actualNodeName) + suite.Require().Equal("n-321fd9bf-3a7c-45f5-9b6b-fb9725ac646d", sysmeta.NodeName) } // repo resulting from `bacalhau version` @@ -172,8 +191,6 @@ func (suite *V3MigrationsTestSuite) TestV3MigrationWithMinimalRepo() { suite.FileExists(filepath.Join(suite.TempDir, "update.json")) suite.FileExists(filepath.Join(suite.TempDir, "user_id.pem")) - expectedInstallationID := "" - // verify the repo's current version is 3 repoVersion3, err := suite.repo.Version() suite.Require().NoError(err) @@ -210,18 +227,13 @@ func (suite *V3MigrationsTestSuite) TestV3MigrationWithMinimalRepo() { suite.DirExists(filepath.Join(suite.TempDir, types.ComputeDirName)) suite.DirExists(filepath.Join(suite.TempDir, types.ComputeDirName, types.ExecutionDirName)) - actualInstallationID, err := suite.repo.ReadInstallationID() + sysmeta, err := suite.repo.SystemMetadata() suite.Require().NoError(err) - suite.Require().Equal(expectedInstallationID, actualInstallationID) - actualLastUpdateCheck, err := suite.repo.ReadLastUpdateCheck() - suite.Require().NoError(err) - suite.Require().Equal(time.UnixMilli(0).UTC(), actualLastUpdateCheck.UTC()) + suite.Require().Equal(time.UnixMilli(0).UTC(), sysmeta.LastUpdateCheck.UTC()) // check that the migration doesn't create a node name if a config file wasn't present. - actualNodeName, err := suite.repo.ReadNodeName() - suite.Require().NoError(err) - suite.Require().Empty(actualNodeName) + suite.Require().Empty(sysmeta.NodeName) } // repo resulting from `bacalhau serve --node-type=requester` @@ -323,20 +335,17 @@ Auth: suite.DirExists(filepath.Join(suite.TempDir, types.ComputeDirName)) suite.DirExists(filepath.Join(suite.TempDir, types.ComputeDirName, types.ExecutionDirName)) - // verify we can read the expected installationID from it. - actualInstallationID, err := suite.repo.ReadInstallationID() + // verify we can read the expected installationID + suite.Require().Equal(expectedInstallationID, suite.mockConfig.InstallationID()) + + sysmeta, err := suite.repo.SystemMetadata() suite.Require().NoError(err) - suite.Require().Equal(expectedInstallationID, actualInstallationID) // verify we can read the expected last update time from it. - actualLastUpdateCheck, err := suite.repo.ReadLastUpdateCheck() - suite.Require().NoError(err) - suite.Require().Equal(time.UnixMilli(0).UTC(), actualLastUpdateCheck.UTC()) + suite.Require().Equal(time.UnixMilli(0).UTC(), sysmeta.LastUpdateCheck.UTC()) // the node name was migrated from the old config to system_metadata.yaml - actualNodeName, err := suite.repo.ReadNodeName() - suite.Require().NoError(err) - suite.Require().Equal("n-321fd9bf-3a7c-45f5-9b6b-fb9725ac646d", actualNodeName) + suite.Require().Equal("n-321fd9bf-3a7c-45f5-9b6b-fb9725ac646d", sysmeta.NodeName) } // repo resulting from `bacalhau serve --node-type=compute --orchestrators=bootstrap.production.bacalhau.org` @@ -434,25 +443,99 @@ Auth: suite.NoDirExists(filepath.Join(suite.TempDir, "orchestrator", "nats-store")) suite.NoFileExists(filepath.Join(suite.TempDir, "orchestrator", "state_boltdb.db")) + // verify we can read the expected installationID + suite.Require().Equal(expectedInstallationID, suite.mockConfig.InstallationID()) + + sysmeta, err := suite.repo.SystemMetadata() + suite.Require().NoError(err) // old compute directories were replaced with new ones suite.NoDirExists(filepath.Join(suite.TempDir, "executor_storages")) suite.DirExists(filepath.Join(suite.TempDir, types.ComputeDirName)) suite.DirExists(filepath.Join(suite.TempDir, types.ComputeDirName, types.ExecutionDirName)) - // verify we can read the expected installationID from it. - actualInstallationID, err := suite.repo.ReadInstallationID() - suite.Require().NoError(err) - suite.Require().Equal(expectedInstallationID, actualInstallationID) - // verify we can read the expected last update time from it. - actualLastUpdateCheck, err := suite.repo.ReadLastUpdateCheck() - suite.Require().NoError(err) - suite.Require().Equal(time.UnixMilli(0).UTC(), actualLastUpdateCheck.UTC()) + suite.Require().Equal(time.UnixMilli(0).UTC(), sysmeta.LastUpdateCheck.UTC()) // the node name was migrated from the old config to system_metadata.yaml - actualNodeName, err := suite.repo.ReadNodeName() - suite.Require().NoError(err) - suite.Require().Equal("n-321fd9bf-3a7c-45f5-9b6b-fb9725ac646d", actualNodeName) + suite.Require().Equal("n-321fd9bf-3a7c-45f5-9b6b-fb9725ac646d", sysmeta.NodeName) +} + +func (suite *V3MigrationsTestSuite) TestV3MigrationInstallationID() { + testCases := []struct { + name string + initialInstallationID string + configInstallationID string + expectedInstallationID string + noConfigFile bool + }{ + { + name: "No existing global installation ID", + initialInstallationID: "", + configInstallationID: "config-id", + expectedInstallationID: "config-id", + }, + { + name: "Existing global installation ID", + initialInstallationID: "existing-id", + configInstallationID: "config-id", + expectedInstallationID: "existing-id", + }, + { + name: "No config installation ID", + initialInstallationID: "", + configInstallationID: "", + expectedInstallationID: "", + }, + { + name: "No config file", + noConfigFile: true, + expectedInstallationID: "", + }, + } + + for _, tc := range testCases { + suite.Run(tc.name, func() { + // call setup to reset the suite's temporary directory + suite.SetupTest() + + // set mock config installation ID if it's not empty + if tc.initialInstallationID != "" { + suite.Require().NoError(writeInstallationID(suite.mockConfig, tc.initialInstallationID)) + } + + // Copy test data to the suite's temporary directory + testDataPath := filepath.Join("testdata", "v3_minimal_repo") + suite.copyRepo(testDataPath) + + if !tc.noConfigFile { + // Create a minimal config file + configPath := filepath.Join(suite.TempDir, config.DefaultFileName) + _, err := createConfig(configPath, fmt.Sprintf(` +User: + InstallationID: %s +`, tc.configInstallationID)) + suite.Require().NoError(err) + suite.FileExists(configPath) + } + + // verify the repo's current version is 3 + repoVersion3, err := suite.repo.Version() + suite.Require().NoError(err) + suite.Equal(repo.Version3, repoVersion3) + + // open the repo to trigger the migration to version 4 + suite.Require().NoError(err) + suite.Require().NoError(suite.repo.Open()) + + // verify the repo's new current version is 4 + repoVersion4, err := suite.repo.Version() + suite.Require().NoError(err) + suite.Equal(repo.Version4, repoVersion4) + + // verify installation ID is as expected + suite.Equal(tc.expectedInstallationID, suite.mockConfig.InstallationID()) + }) + } } // createConfig creates a config file with the given content diff --git a/pkg/repo/sysmeta.go b/pkg/repo/sysmeta.go index 42ca2721ac..ee4730992c 100644 --- a/pkg/repo/sysmeta.go +++ b/pkg/repo/sysmeta.go @@ -3,6 +3,7 @@ package repo import ( "fmt" "os" + "path/filepath" "time" "gopkg.in/yaml.v3" @@ -33,12 +34,38 @@ const SystemMetadataFile = "system_metadata.yaml" type SystemMetadata struct { RepoVersion int `yaml:"RepoVersion"` - InstallationID string `yaml:"InstallationID"` InstanceID string `yaml:"InstanceID"` LastUpdateCheck time.Time `yaml:"LastUpdateCheck"` NodeName string `yaml:"NodeName"` } +func LoadSystemMetadata(path string) (*SystemMetadata, error) { + metaPath := filepath.Join(path, SystemMetadataFile) + if _, err := os.Stat(metaPath); os.IsNotExist(err) { + return nil, fmt.Errorf("system metadata doesn't exist at path %s: %w", metaPath, err) + } else if err != nil { + return nil, fmt.Errorf("failed to read system metadata at path %s: %w", metaPath, err) + } + metaBytes, err := os.ReadFile(metaPath) + if err != nil { + return nil, err + } + sysmeta := new(SystemMetadata) + if err := yaml.Unmarshal(metaBytes, sysmeta); err != nil { + return nil, fmt.Errorf("unmarshalling repo system metadata: %w", err) + } + return sysmeta, nil +} + +func (fsr *FsRepo) SystemMetadata() (*SystemMetadata, error) { + if exists, err := fsr.Exists(); !exists { + return nil, fmt.Errorf("cannot read system metadata. repo uninitalized") + } else if err != nil { + return nil, fmt.Errorf("opening repo system metadata: %w", err) + } + return fsr.readMetadata() +} + // WriteVersion updates the RepoVersion in the SystemMetadataFile. // If the metadata file doesn't exist, it creates a new one. func (fsr *FsRepo) WriteVersion(version int) error { @@ -83,25 +110,9 @@ func (fsr *FsRepo) WriteLastUpdateCheck(lastUpdateCheck time.Time) error { }) } -func (fsr *FsRepo) ReadInstallationID() (string, error) { - sysmeta, err := fsr.readMetadata() - if err != nil { - return "", err - } - return sysmeta.InstallationID, nil -} - -// WriteInstallationID updates the InstallationID in the metadata. -// It fails if the metadata file doesn't exist. -func (fsr *FsRepo) WriteInstallationID(id string) error { - return fsr.updateExistingMetadata(func(sysmeta *SystemMetadata) { - sysmeta.InstallationID = id - }) -} - -// ReadInstanceID reads the InstanceID in the metadata. +// readInstanceID reads the InstanceID in the metadata. // It fails if the metadata file doesn't exist. -func (fsr *FsRepo) ReadInstanceID() (string, error) { +func (fsr *FsRepo) readInstanceID() (string, error) { sysmeta, err := fsr.readMetadata() if err != nil { return "", err @@ -109,9 +120,9 @@ func (fsr *FsRepo) ReadInstanceID() (string, error) { return sysmeta.InstanceID, nil } -// WriteInstanceID updates the InstanceID in the metadata. +// writeInstanceID updates the InstanceID in the metadata. // It fails if the metadata file doesn't exist. -func (fsr *FsRepo) WriteInstanceID(id string) error { +func (fsr *FsRepo) writeInstanceID(id string) error { return fsr.updateExistingMetadata(func(sysmeta *SystemMetadata) { sysmeta.InstanceID = id }) diff --git a/pkg/routing/tracing/tracing.go b/pkg/routing/tracing/tracing.go index d00c629fc1..1efadffe62 100644 --- a/pkg/routing/tracing/tracing.go +++ b/pkg/routing/tracing/tracing.go @@ -7,7 +7,6 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/routing" - "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/telemetry" ) @@ -22,7 +21,7 @@ func NewNodeStore(delegate routing.NodeInfoStore) *NodeStore { } func (r *NodeStore) Add(ctx context.Context, state models.NodeState) error { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/routing.NodeInfoStore.Add") //nolint:govet + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/routing.NodeInfoStore.Add") //nolint:govet defer span.End() stopwatch := telemetry.Timer(ctx, addNodeDurationMilliseconds) @@ -38,7 +37,7 @@ func (r *NodeStore) Add(ctx context.Context, state models.NodeState) error { } func (r *NodeStore) Get(ctx context.Context, nodeID string) (models.NodeState, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/routing.NodeInfoStore.Get") //nolint:govet + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/routing.NodeInfoStore.Get") //nolint:govet defer span.End() stopwatch := telemetry.Timer(ctx, getNodeDurationMilliseconds) @@ -54,7 +53,7 @@ func (r *NodeStore) Get(ctx context.Context, nodeID string) (models.NodeState, e } func (r *NodeStore) GetByPrefix(ctx context.Context, prefix string) (models.NodeState, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/routing.NodeInfoStore.GetByPrefix") //nolint:govet + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/routing.NodeInfoStore.GetByPrefix") //nolint:govet defer span.End() stopwatch := telemetry.Timer(ctx, getPrefixNodeDurationMilliseconds) @@ -70,7 +69,7 @@ func (r *NodeStore) GetByPrefix(ctx context.Context, prefix string) (models.Node } func (r *NodeStore) List(ctx context.Context, filters ...routing.NodeStateFilter) ([]models.NodeState, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/routing.NodeInfoStore.List") //nolint:govet + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/routing.NodeInfoStore.List") //nolint:govet defer span.End() stopwatch := telemetry.Timer(ctx, listNodesDurationMilliseconds) @@ -85,7 +84,7 @@ func (r *NodeStore) List(ctx context.Context, filters ...routing.NodeStateFilter } func (r *NodeStore) Delete(ctx context.Context, nodeID string) error { - ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/routing.NodeInfoStore.Delete") //nolint:govet + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/routing.NodeInfoStore.Delete") //nolint:govet defer span.End() stopwatch := telemetry.Timer(ctx, deleteNodeDurationMilliseconds) diff --git a/pkg/storage/tracing/tracing.go b/pkg/storage/tracing/tracing.go index 1080706355..23a2b0e4ea 100644 --- a/pkg/storage/tracing/tracing.go +++ b/pkg/storage/tracing/tracing.go @@ -8,7 +8,6 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/storage" - "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/telemetry" "github.com/bacalhau-project/bacalhau/pkg/util/reflection" ) @@ -26,21 +25,21 @@ func Wrap(delegate storage.Storage) storage.Storage { } func (t *tracingStorage) IsInstalled(ctx context.Context) (bool, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), fmt.Sprintf("%s.IsInstalled", t.name)) + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), fmt.Sprintf("%s.IsInstalled", t.name)) defer span.End() return t.delegate.IsInstalled(ctx) } func (t *tracingStorage) HasStorageLocally(ctx context.Context, spec models.InputSource) (bool, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), fmt.Sprintf("%s.HasStorageLocally", t.name)) + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), fmt.Sprintf("%s.HasStorageLocally", t.name)) defer span.End() return t.delegate.HasStorageLocally(ctx, spec) } func (t *tracingStorage) GetVolumeSize(ctx context.Context, spec models.InputSource) (uint64, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), fmt.Sprintf("%s.GetVolumeSize", t.name)) + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), fmt.Sprintf("%s.GetVolumeSize", t.name)) defer span.End() return t.delegate.GetVolumeSize(ctx, spec) @@ -50,7 +49,7 @@ func (t *tracingStorage) PrepareStorage( ctx context.Context, storageDirectory string, spec models.InputSource) (storage.StorageVolume, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), fmt.Sprintf("%s.PrepareStorage", t.name)) + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), fmt.Sprintf("%s.PrepareStorage", t.name)) defer span.End() stopwatch := telemetry.Timer(ctx, jobStoragePrepareDurationMilliseconds, spec.Source.MetricAttributes()...) @@ -67,7 +66,7 @@ func (t *tracingStorage) PrepareStorage( } func (t *tracingStorage) CleanupStorage(ctx context.Context, spec models.InputSource, volume storage.StorageVolume) error { - ctx, span := system.NewSpan(ctx, system.GetTracer(), fmt.Sprintf("%s.CleanupStorage", t.name)) + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), fmt.Sprintf("%s.CleanupStorage", t.name)) defer span.End() stopwatch := telemetry.Timer(ctx, jobStorageCleanupDurationMilliseconds, spec.Source.MetricAttributes()...) @@ -83,7 +82,7 @@ func (t *tracingStorage) CleanupStorage(ctx context.Context, spec models.InputSo } func (t *tracingStorage) Upload(ctx context.Context, path string) (models.SpecConfig, error) { - ctx, span := system.NewSpan(ctx, system.GetTracer(), fmt.Sprintf("%s.Upload", t.name)) + ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), fmt.Sprintf("%s.Upload", t.name)) defer span.End() stopwatch := telemetry.Timer(ctx, jobStorageUploadDurationMilliseconds) diff --git a/pkg/system/config.go b/pkg/system/config.go new file mode 100644 index 0000000000..5ae213e526 --- /dev/null +++ b/pkg/system/config.go @@ -0,0 +1,68 @@ +package system + +import ( + "os" + "path/filepath" + "runtime" + "strings" + + "github.com/rs/zerolog/log" +) + +// InstallationIDFile is the name of the file storing the installation ID. +const InstallationIDFile = "installation_id" + +// GlobalConfig defines the interface for accessing global configuration settings. +// The interface is used to abstract the configuration directory and installation ID +// and allows for easy testing, such as testing migration logic. +type GlobalConfig interface { + // InstallationID returns the unique identifier for this installation, if available. + InstallationID() string + // ConfigDir returns the path to the configuration directory. + ConfigDir() string +} + +// DefaultGlobalConfig is the default implementation of GlobalConfig. +var DefaultGlobalConfig GlobalConfig = &realGlobalConfig{} + +// realGlobalConfig is the concrete implementation of GlobalConfig. +type realGlobalConfig struct{} + +// ConfigDir returns the path to the Bacalhau configuration directory. +// It respects the XDG Base Directory Specification on Unix-like systems +// and uses the appropriate directory on Windows. +func (r *realGlobalConfig) ConfigDir() string { + var baseDir string + switch runtime.GOOS { + case "linux", "darwin": + baseDir = os.Getenv("XDG_CONFIG_HOME") + if baseDir == "" { + baseDir = filepath.Join(os.Getenv("HOME"), ".config") + } + case "windows": + baseDir = os.Getenv("APPDATA") + default: + baseDir = filepath.Join(os.Getenv("HOME"), ".config") + } + return filepath.Join(baseDir, "bacalhau") +} + +// InstallationID reads and returns the installation ID from the config file. +// If the file doesn't exist or can't be read, it returns an empty string. +func (r *realGlobalConfig) InstallationID() string { + idFile := filepath.Join(r.ConfigDir(), InstallationIDFile) + idBytes, err := os.ReadFile(idFile) + if err != nil { + if !os.IsNotExist(err) { + log.Debug().Err(err).Msg("Failed to read installation ID file") + } + return "" + } + return strings.TrimSpace(string(idBytes)) +} + +// InstallationID is a convenience function that returns the installation ID +// using the DefaultGlobalConfig. +func InstallationID() string { + return DefaultGlobalConfig.InstallationID() +} diff --git a/pkg/system/tracer.go b/pkg/telemetry/tracer.go similarity index 85% rename from pkg/system/tracer.go rename to pkg/telemetry/tracer.go index cdd3a71915..bc2aab1e3a 100644 --- a/pkg/system/tracer.go +++ b/pkg/telemetry/tracer.go @@ -1,16 +1,16 @@ -package system +package telemetry import ( "context" - _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/telemetry" - "github.com/rs/zerolog/log" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/baggage" oteltrace "go.opentelemetry.io/otel/trace" + + _ "github.com/bacalhau-project/bacalhau/pkg/logger" + "github.com/bacalhau-project/bacalhau/pkg/system" ) // ---------------------------------------- @@ -26,7 +26,7 @@ func GetTracer() oteltrace.Tracer { // ---------------------------------------- func NewSpan(ctx context.Context, t oteltrace.Tracer, name string, opts ...oteltrace.SpanStartOption) (context.Context, oteltrace.Span) { - for _, attributeName := range []string{telemetry.TracerAttributeNameJobID, telemetry.TracerAttributeNameNodeID} { + for _, attributeName := range []string{TracerAttributeNameJobID, TracerAttributeNameNodeID} { if v := baggage.FromContext(ctx).Member(attributeName).Value(); v != "" { opts = append(opts, oteltrace.WithAttributes( attribute.String(attributeName, v), @@ -34,7 +34,7 @@ func NewSpan(ctx context.Context, t oteltrace.Tracer, name string, opts ...otelt } } opts = append(opts, oteltrace.WithAttributes( - attribute.String("environment", GetEnvironment().String()), + attribute.String("environment", system.GetEnvironment().String()), )) return t.Start(ctx, name, opts...) @@ -42,7 +42,7 @@ func NewSpan(ctx context.Context, t oteltrace.Tracer, name string, opts ...otelt func NewRootSpan(ctx context.Context, t oteltrace.Tracer, name string) (context.Context, oteltrace.Span) { // Always include environment info in spans: - environment := GetEnvironment().String() + environment := system.GetEnvironment().String() m0, _ := baggage.NewMember("environment", environment) b, _ := baggage.New(m0) ctx = baggage.ContextWithBaggage(ctx, b) @@ -63,7 +63,7 @@ func NewRootSpan(ctx context.Context, t oteltrace.Tracer, name string) (context. func Span(ctx context.Context, spanName string, opts ...oteltrace.SpanStartOption) (context.Context, oteltrace.Span) { // Always include environment info in spans: opts = append(opts, oteltrace.WithAttributes( - attribute.String("environment", GetEnvironment().String()), + attribute.String("environment", system.GetEnvironment().String()), )) return GetTracer().Start(ctx, spanName, opts...) @@ -74,11 +74,11 @@ func Span(ctx context.Context, spanName string, opts ...oteltrace.SpanStartOptio // ---------------------------------------- func AddNodeIDToBaggage(ctx context.Context, nodeID string) context.Context { - return addFieldToBaggage(ctx, telemetry.TracerAttributeNameNodeID, nodeID) + return addFieldToBaggage(ctx, TracerAttributeNameNodeID, nodeID) } func AddJobIDToBaggage(ctx context.Context, jobID string) context.Context { - return addFieldToBaggage(ctx, telemetry.TracerAttributeNameJobID, jobID) + return addFieldToBaggage(ctx, TracerAttributeNameJobID, jobID) } func addFieldToBaggage(ctx context.Context, key, value string) context.Context { diff --git a/pkg/system/tracer_test.go b/pkg/telemetry/tracer_test.go similarity index 91% rename from pkg/system/tracer_test.go rename to pkg/telemetry/tracer_test.go index 82b2e1af8c..4826ae1a82 100644 --- a/pkg/system/tracer_test.go +++ b/pkg/telemetry/tracer_test.go @@ -1,12 +1,11 @@ //go:build unit || !integration -package system +package telemetry import ( "context" "testing" - "github.com/bacalhau-project/bacalhau/pkg/telemetry" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -16,7 +15,7 @@ import ( func TestTracer(t *testing.T) { t.Cleanup(func() { - assert.NoError(t, telemetry.Cleanup()) + assert.NoError(t, Cleanup()) }) var sr SpanRecorder diff --git a/pkg/util/targzip/targzip.go b/pkg/util/targzip/targzip.go index 8732f1912e..3526073088 100644 --- a/pkg/util/targzip/targzip.go +++ b/pkg/util/targzip/targzip.go @@ -11,9 +11,10 @@ import ( "path/filepath" "strings" - "github.com/bacalhau-project/bacalhau/pkg/system" - "github.com/bacalhau-project/bacalhau/pkg/util/closer" "github.com/c2h5oh/datasize" + + "github.com/bacalhau-project/bacalhau/pkg/telemetry" + "github.com/bacalhau-project/bacalhau/pkg/util/closer" ) const ( @@ -55,7 +56,7 @@ func UncompressedSize(src io.Reader) (datasize.ByteSize, error) { // //nolint:gocyclo,funlen func compress(ctx context.Context, src string, buf io.Writer, max datasize.ByteSize, stripPath bool) error { - _, span := system.NewSpan(ctx, system.GetTracer(), "pkg/util/targzip.compress") + _, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/util/targzip.compress") defer span.End() // tar > gzip > buf diff --git a/pkg/version/update.go b/pkg/version/update.go index f6b164b79d..0d5adbb6d1 100644 --- a/pkg/version/update.go +++ b/pkg/version/update.go @@ -17,6 +17,7 @@ import ( baccrypto "github.com/bacalhau-project/bacalhau/pkg/lib/crypto" "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/system" ) const ( @@ -43,7 +44,6 @@ func CheckForUpdate( ctx context.Context, currentClientVersion, currentServerVersion *models.BuildVersionInfo, clientID string, - InstallationID string, ) (*UpdateCheckResponse, error) { u, err := url.Parse(getUpdateCheckerEndpoint()) if err != nil { @@ -60,7 +60,9 @@ func CheckForUpdate( q.Set("serverVersion", currentServerVersion.GitVersion) } q.Set("clientID", clientID) - q.Set("InstallationID", InstallationID) + if installationID := system.InstallationID(); installationID != "" { + q.Set("InstallationID", installationID) + } // The BACALHAU_UPDATE_CHECKER_TEST is an env variable a user can set so that we can track // when the binary is being run by a non-user, to enable easier filtering of queries @@ -105,7 +107,6 @@ func LogUpdateResponse(ctx context.Context, ucr *UpdateCheckResponse) { type UpdateStore interface { ReadLastUpdateCheck() (time.Time, error) WriteLastUpdateCheck(time.Time) error - ReadInstallationID() (string, error) } // RunUpdateChecker starts a goroutine that will periodically make an update @@ -150,14 +151,7 @@ func RunUpdateChecker( serverVersion = nil } - installationID, err := store.ReadInstallationID() - if err != nil { - log.Ctx(ctx).Error().Err(err).Msg("failed to read user installationID") - // we can continue here and still use an empty string - // a failure here indicates the system metadata file is un-readable - } - - updateResponse, err := CheckForUpdate(ctx, clientVersion, serverVersion, userKey.ClientID(), installationID) + updateResponse, err := CheckForUpdate(ctx, clientVersion, serverVersion, userKey.ClientID()) if err != nil { log.Ctx(ctx).Error().Err(err).Msg("Failed to perform update check") }