diff --git a/cmd/cli/config/set.go b/cmd/cli/config/set.go index 1e885824a3..e2dddaaecd 100644 --- a/cmd/cli/config/set.go +++ b/cmd/cli/config/set.go @@ -79,7 +79,12 @@ func newSetCmd() *cobra.Command { ValidArgsFunction: setAutoComplete, } - setCmd.PersistentFlags().VarP(cliflags.NewWriteConfigFlag(), "config", "c", "Path to the config file (default is $BACALHAU_DIR/config.yaml)") + setCmd.PersistentFlags().VarP( + cliflags.NewWriteConfigFlag(), + "config", + "c", + "Path to the config file (default is $BACALHAU_DIR/config.yaml)", + ) return setCmd } diff --git a/cmd/cli/serve/serve.go b/cmd/cli/serve/serve.go index 33d679c8fd..971dfe7f6d 100644 --- a/cmd/cli/serve/serve.go +++ b/cmd/cli/serve/serve.go @@ -149,7 +149,6 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error { if err != nil { return fmt.Errorf("reloading system metadata after persisting name: %w", err) } - } else { // Warn if the flag was provided but node name already exists if flagNodeName := cmd.PersistentFlags().Lookup(NameFlagName).Value.String(); flagNodeName != "" && flagNodeName != sysmeta.NodeName { @@ -248,8 +247,8 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error { if len(cfg.Compute.AllowListedLocalPaths) > 0 { startupLog.Strs("volumes", cfg.Compute.AllowListedLocalPaths) } - } + if cfg.Orchestrator.Enabled { startupLog.Str("orchestrator_address", fmt.Sprintf("%s:%d", cfg.Orchestrator.Host, cfg.Orchestrator.Port)) diff --git a/cmd/util/fatal.go b/cmd/util/fatal.go index 45f592b357..8d4cc94ac3 100644 --- a/cmd/util/fatal.go +++ b/cmd/util/fatal.go @@ -42,7 +42,6 @@ func fatalError(cmd *cobra.Command, err error, code int) { cmd.PrintErrln(stackTrace) } } - } else { cmd.PrintErrln(output.RedStr("Error: ") + err.Error()) } diff --git a/cmd/util/flags/configflags/deprecation.go b/cmd/util/flags/configflags/deprecation.go index 4b583cc7c2..f9930fb30c 100644 --- a/cmd/util/flags/configflags/deprecation.go +++ b/cmd/util/flags/configflags/deprecation.go @@ -4,7 +4,8 @@ import ( "fmt" ) -const FeatureDeprecatedMessage = "This feature has been deprecated and is no longer functional. The flag has no effect and can be safely removed." +const FeatureDeprecatedMessage = "This feature has been deprecated and is no longer " + + "functional. The flag has no effect and can be safely removed." func makeDeprecationMessage(key string) string { return fmt.Sprintf("Use %s to set this configuration", makeConfigFlagDeprecationCommand(key)) diff --git a/cmd/util/flags/configflags/ipfs.go b/cmd/util/flags/configflags/ipfs.go index 8289692a55..442e38508a 100644 --- a/cmd/util/flags/configflags/ipfs.go +++ b/cmd/util/flags/configflags/ipfs.go @@ -20,28 +20,31 @@ var IPFSFlags = []Definition{ ), }, { - FlagName: "ipfs-connect-storage", - ConfigPath: types.InputSourcesTypesIPFSEndpointKey, - DefaultValue: config.Default.InputSources.Types.IPFS.Endpoint, - Description: "The ipfs host multiaddress to connect to for inputs, otherwise an in-process IPFS node will be created if not set.", + FlagName: "ipfs-connect-storage", + ConfigPath: types.InputSourcesTypesIPFSEndpointKey, + DefaultValue: config.Default.InputSources.Types.IPFS.Endpoint, + Description: "The ipfs host multiaddress to connect to for inputs, " + + "otherwise an in-process IPFS node will be created if not set.", EnvironmentVariables: []string{"BACALHAU_NODE_IPFS_CONNECT"}, Deprecated: true, DeprecatedMessage: makeDeprecationMessage(types.InputSourcesTypesIPFSEndpointKey), }, { - FlagName: "ipfs-connect-publisher", - ConfigPath: types.PublishersTypesIPFSEndpointKey, - DefaultValue: config.Default.Publishers.Types.IPFS.Endpoint, - Description: "The ipfs host multiaddress to connect to for publishing, otherwise an in-process IPFS node will be created if not set.", + FlagName: "ipfs-connect-publisher", + ConfigPath: types.PublishersTypesIPFSEndpointKey, + DefaultValue: config.Default.Publishers.Types.IPFS.Endpoint, + Description: "The ipfs host multiaddress to connect to for publishing, " + + "otherwise an in-process IPFS node will be created if not set.", EnvironmentVariables: []string{"BACALHAU_NODE_IPFS_CONNECT"}, Deprecated: true, DeprecatedMessage: makeDeprecationMessage(types.PublishersTypesIPFSEndpointKey), }, { - FlagName: "ipfs-connect-downloader", - ConfigPath: types.ResultDownloadersTypesIPFSEndpointKey, - DefaultValue: config.Default.ResultDownloaders.Types.IPFS.Endpoint, - Description: "The ipfs host multiaddress to connect to for downloading, otherwise an in-process IPFS node will be created if not set.", + FlagName: "ipfs-connect-downloader", + ConfigPath: types.ResultDownloadersTypesIPFSEndpointKey, + DefaultValue: config.Default.ResultDownloaders.Types.IPFS.Endpoint, + Description: "The ipfs host multiaddress to connect to for downloading, " + + "otherwise an in-process IPFS node will be created if not set.", EnvironmentVariables: []string{"BACALHAU_NODE_IPFS_CONNECT"}, Deprecated: true, DeprecatedMessage: makeDeprecationMessage(types.ResultDownloadersTypesIPFSEndpointKey), diff --git a/cmd/util/printer/fish_spinner.go b/cmd/util/printer/fish_spinner.go index c5de731e27..4cd8c9f27c 100644 --- a/cmd/util/printer/fish_spinner.go +++ b/cmd/util/printer/fish_spinner.go @@ -12,8 +12,9 @@ import ( // ANSI escape codes for cursor control const ( - hideCursor = "\033[?25l" - showCursor = "\033[?25h" + hideCursor = "\033[?25l" + showCursor = "\033[?25h" + tickerInterval = 200 * time.Millisecond ) // FishSpinner represents a simple fish emoji spinner. @@ -96,7 +97,7 @@ func (s *FishSpinner) Resume() { // run continuously updates the spinner animation func (s *FishSpinner) run() { - ticker := time.NewTicker(200 * time.Millisecond) + ticker := time.NewTicker(tickerInterval) defer ticker.Stop() for { diff --git a/cmd/util/printer/progress_printer.go b/cmd/util/printer/progress_printer.go index ea1ad98c5a..b14ec1b48f 100644 --- a/cmd/util/printer/progress_printer.go +++ b/cmd/util/printer/progress_printer.go @@ -100,14 +100,14 @@ func (j *JobProgressPrinter) followProgress(ctx context.Context, job *models.Job cmd.Print("Checking job status... (Enter Ctrl+C to exit at any time, your job will continue running):\n\n") } - eventPrinter := j.createEventPrinter(cmd) + currentEventPrinter := j.createEventPrinter(cmd) eventChan := make(chan *models.JobHistory) errChan := make(chan error, 1) go j.fetchEvents(ctx, job, eventChan, errChan) // process events until the context is canceled or the job is done - if err := j.handleEvents(ctx, eventPrinter, eventChan, errChan); err != nil { + if err := j.handleEvents(ctx, currentEventPrinter, eventChan, errChan); err != nil { if errors.Is(err, context.DeadlineExceeded) { j.printTimeoutMessage(cmd) return nil @@ -125,6 +125,7 @@ func (j *JobProgressPrinter) fetchEvents(ctx context.Context, job *models.Job, e defer close(eventChan) // Create a ticker that ticks every 500 milliseconds + //nolint:mnd // Time interval easier to read this way ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() @@ -163,7 +164,12 @@ func (j *JobProgressPrinter) fetchEvents(ctx context.Context, job *models.Job, e } } -func (j *JobProgressPrinter) handleEvents(ctx context.Context, printer eventPrinter, eventChan <-chan *models.JobHistory, errChan <-chan error) error { +func (j *JobProgressPrinter) handleEvents( + ctx context.Context, + printer eventPrinter, + eventChan <-chan *models.JobHistory, + errChan <-chan error, +) error { defer printer.close() // close the printer to clear any spinner before exiting for { diff --git a/cmd/util/printer/spinner.go b/cmd/util/printer/spinner.go index 13c1d2073a..7317f0a540 100644 --- a/cmd/util/printer/spinner.go +++ b/cmd/util/printer/spinner.go @@ -238,6 +238,11 @@ const ( WidthTimer = 0 ) +const ( + maxUint8Value = 255 + minUint8Value = 0 +) + type LineMessage struct { Message string Detail string @@ -248,10 +253,18 @@ type LineMessage struct { } func NewLineMessage(msg string, maxWidth int) LineMessage { + safeWidth := maxWidth + if maxWidth > maxUint8Value { + safeWidth = maxUint8Value // Cap at maximum uint8 value + } else if maxWidth < minUint8Value { + safeWidth = minUint8Value // Handle negative values + } + + //nolint:gosec // Safe uint8 conversion - value is bounded by maxUint8Value check above return LineMessage{ Message: msg, TimerString: spinnerFmtDuration(SpinnerFormatDurationDefault), - ColumnWidths: []uint8{uint8(maxWidth), WidthDots, WidthStatus, WidthTimer}, + ColumnWidths: []uint8{uint8(safeWidth), WidthDots, WidthStatus, WidthTimer}, } } diff --git a/cmd/util/printer/utils.go b/cmd/util/printer/utils.go index f9ddbb3c01..fdfb472987 100644 --- a/cmd/util/printer/utils.go +++ b/cmd/util/printer/utils.go @@ -33,6 +33,7 @@ const ( var terminalWidth int +//nolint:gosec // terminalWidth is used for spacing and won't exceed reasonable values func getTerminalWidth(cmd *cobra.Command) uint { if terminalWidth == 0 { var err error @@ -102,6 +103,7 @@ func SummariseHistoryEvents(history []*models.JobHistory) []models.Event { return maps.Values(events) } +//nolint:gosec // indent is used for spacing and won't exceed reasonable values func printIndentedString(cmd *cobra.Command, prefix, msg string, prefixColor *color.Color, startIndent uint) { maxWidth := getTerminalWidth(cmd) blockIndent := int(startIndent) + len(prefix) diff --git a/pkg/bacerrors/error.go b/pkg/bacerrors/error.go index 80dee3a220..7f264052bd 100644 --- a/pkg/bacerrors/error.go +++ b/pkg/bacerrors/error.go @@ -116,7 +116,7 @@ func Wrap(err error, format string, a ...any) Error { newErr.wrappingMsg = message return &newErr } - nErr := New(fmt.Sprintf("%s: %s", message, err.Error())) + nErr := New("%s: %s", message, err.Error()) nErr.(*errorImpl).wrappedErr = err nErr.(*errorImpl).wrappingMsg = message return nErr diff --git a/pkg/bacerrors/stack.go b/pkg/bacerrors/stack.go index b86739ef13..951ab29ee8 100644 --- a/pkg/bacerrors/stack.go +++ b/pkg/bacerrors/stack.go @@ -6,6 +6,12 @@ import ( "strings" ) +const ( + // maxStackDepth defines the maximum number of stack frames to capture + // in the error stack trace. + maxStackDepth = 32 +) + // stack represents a stack of program counters. type stack []uintptr @@ -25,9 +31,9 @@ func (s *stack) String() string { } func callers() *stack { - const depth = 32 - var pcs [depth]uintptr - n := runtime.Callers(4, pcs[:]) + const skipCallers = 4 + var pcs [maxStackDepth]uintptr + n := runtime.Callers(skipCallers, pcs[:]) var st stack = pcs[0:n] return &st } diff --git a/pkg/compute/capacity/system/gpu/intel.go b/pkg/compute/capacity/system/gpu/intel.go index fd206ac23b..cad47e0404 100644 --- a/pkg/compute/capacity/system/gpu/intel.go +++ b/pkg/compute/capacity/system/gpu/intel.go @@ -107,6 +107,7 @@ func (intel *intelGPUProvider) GetAvailableCapacity(ctx context.Context) (models // Start with an empty Resources and just fold over it var allGPUs models.Resources for _, gpu := range gpuList.GPUs { + //nolint:gosec // G115: GPU indices are always within int range provider := intel.getInfoProvider(int(gpu.Index)) gpuInfo, err := provider.GetAvailableCapacity(ctx) if err != nil { diff --git a/pkg/compute/endpoint.go b/pkg/compute/endpoint.go index a5385ccf3a..937509b277 100644 --- a/pkg/compute/endpoint.go +++ b/pkg/compute/endpoint.go @@ -52,7 +52,12 @@ func (s BaseEndpoint) GetNodeID() string { } func (s BaseEndpoint) AskForBid(ctx context.Context, request AskForBidRequest) (AskForBidResponse, error) { - ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/compute.BaseEndpoint.AskForBid", trace.WithSpanKind(trace.SpanKindInternal)) + ctx, span := telemetry.NewSpan( + ctx, + telemetry.GetTracer(), + "pkg/compute.BaseEndpoint.AskForBid", + trace.WithSpanKind(trace.SpanKindInternal), + ) defer span.End() log.Ctx(ctx).Debug().Msgf("asked to bid on: %+v", request) jobsReceived.Add(ctx, 1) diff --git a/pkg/compute/executor.go b/pkg/compute/executor.go index f27a5fac7e..f561546b07 100644 --- a/pkg/compute/executor.go +++ b/pkg/compute/executor.go @@ -29,7 +29,7 @@ type BaseExecutorParams struct { Store store.ExecutionStore Storages storage.StorageProvider StorageDirectory string - Executors executor.ExecutorProvider + Executors executor.ExecProvider ResultsPath ResultsPath Publishers publisher.PublisherProvider FailureInjectionConfig models.FailureInjectionConfig @@ -43,7 +43,7 @@ type BaseExecutor struct { store store.ExecutionStore Storages storage.StorageProvider storageDirectory string - executors executor.ExecutorProvider + executors executor.ExecProvider publishers publisher.PublisherProvider resultsPath ResultsPath failureInjection models.FailureInjectionConfig diff --git a/pkg/compute/logstream/server.go b/pkg/compute/logstream/server.go index 4525fc51d4..4684b4372e 100644 --- a/pkg/compute/logstream/server.go +++ b/pkg/compute/logstream/server.go @@ -19,7 +19,7 @@ const defaultBuffer = 100 type ServerParams struct { ExecutionStore store.ExecutionStore - Executors executor.ExecutorProvider + Executors executor.ExecProvider // Buffer is the size of the channel buffer for each individual log stream. // If not set (0), defaultBuffer will be used. Buffer int @@ -27,7 +27,7 @@ type ServerParams struct { type Server struct { executionStore store.ExecutionStore - executors executor.ExecutorProvider + executors executor.ExecProvider // buffer is the size of the channel buffer for each individual log stream. buffer int } diff --git a/pkg/compute/node_info_decorator.go b/pkg/compute/node_info_decorator.go index 8b4c2ce1a4..eaebe448d4 100644 --- a/pkg/compute/node_info_decorator.go +++ b/pkg/compute/node_info_decorator.go @@ -11,7 +11,7 @@ import ( ) type NodeInfoDecoratorParams struct { - Executors executor.ExecutorProvider + Executors executor.ExecProvider Publisher publisher.PublisherProvider Storages storage.StorageProvider RunningCapacityTracker capacity.Tracker @@ -21,7 +21,7 @@ type NodeInfoDecoratorParams struct { } type NodeInfoDecorator struct { - executors executor.ExecutorProvider + executors executor.ExecProvider publishers publisher.PublisherProvider storages storage.StorageProvider runningCapacityTracker capacity.Tracker diff --git a/pkg/compute/store/boltdb/store.go b/pkg/compute/store/boltdb/store.go index 2dbfe1f20d..fb33361fda 100644 --- a/pkg/compute/store/boltdb/store.go +++ b/pkg/compute/store/boltdb/store.go @@ -461,7 +461,12 @@ func (s *Store) GetExecutionCount(ctx context.Context, state store.LocalExecutio return nil } - count = uint64(b.Stats().KeyN) + keyCount := b.Stats().KeyN + if keyCount < 0 { + return fmt.Errorf("invalid negative key count from bucket: %d", keyCount) + } + + count = uint64(keyCount) return nil }) diff --git a/pkg/compute/store/boltdb/utils.go b/pkg/compute/store/boltdb/utils.go index 0bcee76b5b..3bd0b92f2d 100644 --- a/pkg/compute/store/boltdb/utils.go +++ b/pkg/compute/store/boltdb/utils.go @@ -6,6 +6,11 @@ import ( bolt "go.etcd.io/bbolt" ) +const ( + // uint64ByteSize is the number of bytes needed to represent a uint64 + uint64ByteSize = 8 +) + // strToBytes converts a string to a byte slice func strToBytes(s string) []byte { return []byte(s) @@ -13,7 +18,7 @@ func strToBytes(s string) []byte { // uint64ToBytes converts an uint64 to a byte slice func uint64ToBytes(i uint64) []byte { - buf := make([]byte, 8) //nolint:gomnd + buf := make([]byte, uint64ByteSize) binary.BigEndian.PutUint64(buf, i) return buf } diff --git a/pkg/config/config.go b/pkg/config/config.go index ba3649f40a..8547c38d09 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -90,6 +90,8 @@ func WithValues(values map[string]any) Option { // New returns a configuration with the provided options applied. If no options are provided, the returned config // contains only the default values. +// +//nolint:funlen,gocyclo // TODO: This function is very long and complex. Need to improve it. func New(opts ...Option) (*Config, error) { base := viper.New() base.SetEnvPrefix(environmentVariablePrefix) diff --git a/pkg/config/types/downloaders.go b/pkg/config/types/downloaders.go index e7ed9962cd..7bc6d6e673 100644 --- a/pkg/config/types/downloaders.go +++ b/pkg/config/types/downloaders.go @@ -21,7 +21,7 @@ type ResultDownloadersTypes struct { func (r ResultDownloaders) IsNotDisabled(kind string) bool { return !slices.ContainsFunc(r.Disabled, func(s string) bool { - return strings.ToLower(s) == strings.ToLower(kind) + return strings.EqualFold(s, kind) }) } diff --git a/pkg/config/types/engines.go b/pkg/config/types/engines.go index b885afe0b2..4c8fa636a0 100644 --- a/pkg/config/types/engines.go +++ b/pkg/config/types/engines.go @@ -20,7 +20,7 @@ type EngineConfigTypes struct { func (e EngineConfig) IsNotDisabled(kind string) bool { return !slices.ContainsFunc(e.Disabled, func(s string) bool { - return strings.ToLower(s) == strings.ToLower(kind) + return strings.EqualFold(s, kind) }) } diff --git a/pkg/config/types/gen/generate.go b/pkg/config/types/gen/generate.go index af644e9f22..b8db1f7153 100644 --- a/pkg/config/types/gen/generate.go +++ b/pkg/config/types/gen/generate.go @@ -165,7 +165,14 @@ type FieldInfo struct { CapitalizedPath string } -func processStruct(prefix string, capPrefix string, structType *ast.StructType, fieldInfos map[string]FieldInfo, typeMap map[string]*ast.StructType) { +//nolint:funlen,gocyclo // TODO: Function is very long and complex +func processStruct( + prefix string, + capPrefix string, + structType *ast.StructType, + fieldInfos map[string]FieldInfo, + typeMap map[string]*ast.StructType, +) { for _, field := range structType.Fields.List { // Get field names var fieldNames []string diff --git a/pkg/config/types/publishers.go b/pkg/config/types/publishers.go index 14c68d2a19..92638539fa 100644 --- a/pkg/config/types/publishers.go +++ b/pkg/config/types/publishers.go @@ -21,7 +21,7 @@ type PublisherTypes struct { func (p PublishersConfig) IsNotDisabled(kind string) bool { return !slices.ContainsFunc(p.Disabled, func(s string) bool { - return strings.ToLower(s) == strings.ToLower(kind) + return strings.EqualFold(s, kind) }) } diff --git a/pkg/config/types/resource.go b/pkg/config/types/resource.go index c2dc9358cf..b55b529194 100644 --- a/pkg/config/types/resource.go +++ b/pkg/config/types/resource.go @@ -43,6 +43,9 @@ func (s ResourceScaler) IsZero() bool { return s.CPU == "" && s.Memory == "" && s.Disk == "" && s.GPU == "" } +// ToResource TODO: This is a very complex function that should be simplified and split +// +//nolint:gocyclo func (s ResourceScaler) ToResource(in models.Resources) (*models.Resources, error) { out := new(models.Resources) if s.CPU.IsScaler() { diff --git a/pkg/config/types/storage.go b/pkg/config/types/storage.go index 9f2c48b752..b2ed467967 100644 --- a/pkg/config/types/storage.go +++ b/pkg/config/types/storage.go @@ -23,7 +23,7 @@ type InputSourcesTypes struct { func (i InputSourcesConfig) IsNotDisabled(kind string) bool { return !slices.ContainsFunc(i.Disabled, func(s string) bool { - return strings.ToLower(s) == strings.ToLower(kind) + return strings.EqualFold(s, kind) }) } diff --git a/pkg/config_legacy/config.go b/pkg/config_legacy/config.go index 2c4b065501..2e2f3d7b67 100644 --- a/pkg/config_legacy/config.go +++ b/pkg/config_legacy/config.go @@ -1,3 +1,4 @@ +//nolint:stylecheck // ST1003: legacy package naming maintained for compatibility package config_legacy import ( diff --git a/pkg/config_legacy/configenv/production.go b/pkg/config_legacy/configenv/production.go index 97689e884c..5e519b20a5 100644 --- a/pkg/config_legacy/configenv/production.go +++ b/pkg/config_legacy/configenv/production.go @@ -1,4 +1,4 @@ -//nolint:gomnd +//nolint:mnd package configenv import ( diff --git a/pkg/config_legacy/helpers.go b/pkg/config_legacy/helpers.go index 9dbc01f44b..7fb196cc0c 100644 --- a/pkg/config_legacy/helpers.go +++ b/pkg/config_legacy/helpers.go @@ -1,3 +1,4 @@ +//nolint:stylecheck // ST1003: legacy package naming maintained for compatibility package config_legacy import ( diff --git a/pkg/config_legacy/types/testing.go b/pkg/config_legacy/types/testing.go index 35b1fdcf5a..18b97612b4 100644 --- a/pkg/config_legacy/types/testing.go +++ b/pkg/config_legacy/types/testing.go @@ -1,4 +1,4 @@ -//nolint:gomnd +//nolint:mnd // Ignoring magic numbers in this configuration file, since it is easier to read that way package types import ( @@ -39,7 +39,7 @@ var Testing = BacalhauConfig{ Network: NetworkConfig{ Port: 4222, }, - DownloadURLRequestTimeout: Duration(300 * time.Second), + DownloadURLRequestTimeout: Duration(5 * time.Minute), VolumeSizeRequestTimeout: Duration(2 * time.Minute), DownloadURLRequestRetries: 3, LoggingMode: logger.LogModeDefault, diff --git a/pkg/devstack/factories.go b/pkg/devstack/factories.go index 2be408bc22..e1905eda3f 100644 --- a/pkg/devstack/factories.go +++ b/pkg/devstack/factories.go @@ -40,7 +40,7 @@ func NewNoopExecutorsFactory() node.ExecutorsFactory { func NewNoopExecutorsFactoryWithConfig(config noop_executor.ExecutorConfig) node.ExecutorsFactory { return node.ExecutorsFactoryFunc( - func(ctx context.Context, nodeConfig node.NodeConfig) (executor.ExecutorProvider, error) { + func(ctx context.Context, nodeConfig node.NodeConfig) (executor.ExecProvider, error) { return executor_util.NewNoopExecutors(config), nil }) } diff --git a/pkg/docker/errors.go b/pkg/docker/errors.go index 3b5e286bfc..625a77e44a 100644 --- a/pkg/docker/errors.go +++ b/pkg/docker/errors.go @@ -48,62 +48,62 @@ func NewDockerError(err error) (bacErr bacerrors.Error) { case errdefs.IsNotFound(err): return handleNotFoundError(err) case errdefs.IsConflict(err): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(Conflict). WithHTTPStatusCode(http.StatusConflict) case errdefs.IsUnauthorized(err): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(Unauthorized). WithHTTPStatusCode(http.StatusUnauthorized). WithHint("Ensure you have the necessary permissions and that your credentials are correct. " + "You may need to log in to Docker again.") case errdefs.IsForbidden(err): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(Forbidden). WithHTTPStatusCode(http.StatusForbidden). WithHint(fmt.Sprintf("You don't have permission to perform this action. "+ "Supply the node with valid Docker login credentials using the %s and %s environment variables", config_legacy.DockerUsernameEnvVar, config_legacy.DockerPasswordEnvVar)) case errdefs.IsDataLoss(err): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(DataLoss). WithHTTPStatusCode(http.StatusInternalServerError). WithFailsExecution() case errdefs.IsDeadline(err): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(Deadline). WithHTTPStatusCode(http.StatusGatewayTimeout). WithHint("The operation timed out. This could be due to network issues or high system load. " + "Try again later or check your network connection."). WithRetryable() case errdefs.IsCancelled(err): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(Cancelled). WithHTTPStatusCode(http.StatusRequestTimeout). WithHint("The operation was cancelled. " + "This is often due to user intervention or a competing operation.") case errdefs.IsUnavailable(err): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(Unavailable). WithHTTPStatusCode(http.StatusServiceUnavailable). WithHint("The Docker daemon or a required service is unavailable. " + "Check if the Docker daemon is running and healthy."). WithRetryable() case errdefs.IsSystem(err): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(SystemError). WithHTTPStatusCode(http.StatusInternalServerError). WithHint("An internal system error occurred. This could be due to resource constraints. " + "Check system resources and Docker logs for more information."). WithFailsExecution() case errdefs.IsNotImplemented(err): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(NotImplemented). WithHTTPStatusCode(http.StatusNotImplemented). WithHint("This feature is not implemented in your version of Docker. " + "Check Docker documentation for feature availability and consider upgrading if necessary.") default: - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(UnknownError). WithHTTPStatusCode(http.StatusInternalServerError) } @@ -136,7 +136,7 @@ func NewDockerImageError(err error, image string) (bacErr bacerrors.Error) { } func NewCustomDockerError(code bacerrors.ErrorCode, message string) bacerrors.Error { - return bacerrors.New(message). + return bacerrors.New("%s", message). WithCode(code). WithComponent(Component) } @@ -144,11 +144,11 @@ func NewCustomDockerError(code bacerrors.ErrorCode, message string) bacerrors.Er func handleNotFoundError(err error) bacerrors.Error { errorLower := strings.ToLower(err.Error()) if strings.Contains(errorLower, "no such container") { - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(ContainerNotFound). WithHTTPStatusCode(http.StatusNotFound) } - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(NotFound). WithHTTPStatusCode(http.StatusNotFound) } diff --git a/pkg/executor/docker/bidstrategy/semantic/image_platform.go b/pkg/executor/docker/bidstrategy/semantic/image_platform.go index 1cb120801b..950d26aeef 100644 --- a/pkg/executor/docker/bidstrategy/semantic/image_platform.go +++ b/pkg/executor/docker/bidstrategy/semantic/image_platform.go @@ -81,7 +81,7 @@ func (s *ImagePlatformBidStrategy) ShouldBid( defer func() { err = ManifestCache.Set( dockerEngine.Image, manifest, 1, oneDayInSeconds, - ) //nolint:gomnd + ) //nolint:mnd if err != nil { // Log the error but continue as it is not serious enough to stop // processing diff --git a/pkg/executor/docker/executor.go b/pkg/executor/docker/executor.go index adb77ac97d..cfd56f5e18 100644 --- a/pkg/executor/docker/executor.go +++ b/pkg/executor/docker/executor.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "os" "path/filepath" "strings" @@ -352,6 +353,13 @@ func (e *Executor) newDockerJobContainer(ctx context.Context, params *dockerJobC } log.Ctx(ctx).Trace().Msgf("Adding %d GPUs to request", params.Resources.GPU) + if params.Resources.Memory > math.MaxInt64 { + return container.CreateResponse{}, fmt.Errorf( + "memory value %d exceeds maximum allowed value %d", params.Resources.Memory, math.MaxInt64, + ) + } + + //nolint:gosec // G115: negative memory values already checked above hostConfig := &container.HostConfig{ Mounts: mounts, Resources: container.Resources{ diff --git a/pkg/executor/docker/handler.go b/pkg/executor/docker/handler.go index e1a1e54c9d..bf6f02999c 100644 --- a/pkg/executor/docker/handler.go +++ b/pkg/executor/docker/handler.go @@ -203,7 +203,7 @@ func (h *executionHandler) destroy(timeout time.Duration) error { func (h *executionHandler) outputStream(ctx context.Context, request executor.LogStreamRequest) (io.ReadCloser, error) { since := "1" if request.Tail { - since = strconv.FormatInt(time.Now().Unix(), 10) //nolint:gomnd + since = strconv.FormatInt(time.Now().Unix(), 10) //nolint:mnd } select { case <-ctx.Done(): diff --git a/pkg/executor/noop/errors.go b/pkg/executor/noop/errors.go index 98e93564e9..3902114913 100644 --- a/pkg/executor/noop/errors.go +++ b/pkg/executor/noop/errors.go @@ -7,7 +7,7 @@ import ( const NoopExecutorComponent = "Executor/Noop" func NewNoopExecutorError(code bacerrors.ErrorCode, message string) bacerrors.Error { - return bacerrors.New(message). + return bacerrors.New("%s", message). WithCode(code). WithComponent(NoopExecutorComponent) } diff --git a/pkg/executor/results.go b/pkg/executor/results.go index 5b8de11ff2..981407c503 100644 --- a/pkg/executor/results.go +++ b/pkg/executor/results.go @@ -28,6 +28,7 @@ type outputResult struct { truncated *bool } +//nolint:gosec // G115: limits within reasonable bounds func writeOutputResult(resultsDir string, output outputResult) error { if output.contents == nil { log.Warn().Msg("writing result output contents null") @@ -69,7 +70,14 @@ func writeOutputResult(resultsDir string, output outputResult) error { return err } - _, err = io.CopyN(file, output.contents, int64(int(output.fileLimit)-fileWritten)) + // Calculate remaining bytes + remainingLimit := int64(output.fileLimit) + if remainingLimit < int64(fileWritten) { + return fmt.Errorf("file written size %d exceeds limit %d", fileWritten, output.fileLimit) + } + remaining := remainingLimit - int64(fileWritten) + + _, err = io.CopyN(file, output.contents, remaining) if err != nil && err != io.EOF { return err } diff --git a/pkg/executor/types.go b/pkg/executor/types.go index 8200c66836..86a8a3f55e 100644 --- a/pkg/executor/types.go +++ b/pkg/executor/types.go @@ -11,10 +11,10 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/storage" ) -const EXECUTOR_COMPONENT = "Executor" +const ExecComponentName = "Executor" -// ExecutorProvider returns a executor for the given engine type -type ExecutorProvider = provider.Provider[Executor] +// ExecProvider returns a executor for the given engine type +type ExecProvider = provider.Provider[Executor] // Executor serves as an execution manager for running jobs on a specific backend, such as a Docker daemon. // It provides a comprehensive set of methods to initiate, monitor, terminate, and retrieve output streams for executions. @@ -86,5 +86,5 @@ const ( ) func NewExecutorError(code bacerrors.ErrorCode, message string) bacerrors.Error { - return bacerrors.New(message).WithCode(code).WithComponent(EXECUTOR_COMPONENT) + return bacerrors.New("%s", message).WithCode(code).WithComponent(ExecComponentName) } diff --git a/pkg/executor/util/executors_bid_strategy.go b/pkg/executor/util/executors_bid_strategy.go index f105d34a98..fc69b2ca40 100644 --- a/pkg/executor/util/executors_bid_strategy.go +++ b/pkg/executor/util/executors_bid_strategy.go @@ -10,10 +10,10 @@ import ( ) type bidStrategyFromExecutor struct { - provider executor.ExecutorProvider + provider executor.ExecProvider } -func NewExecutorSpecificBidStrategy(provider executor.ExecutorProvider) bidstrategy.BidStrategy { +func NewExecutorSpecificBidStrategy(provider executor.ExecProvider) bidstrategy.BidStrategy { return bidstrategy.NewChainedBidStrategy( bidstrategy.WithSemantics( semantic.NewProviderInstalledStrategy[executor.Executor]( diff --git a/pkg/executor/util/utils.go b/pkg/executor/util/utils.go index b40b02b5e2..a209e6f3f8 100644 --- a/pkg/executor/util/utils.go +++ b/pkg/executor/util/utils.go @@ -84,7 +84,6 @@ func NewStandardStorageProvider(cfg types.Bacalhau) (storage.StorageProvider, er return nil, err } providers[models.StorageSourceIPFS] = tracing.Wrap(ipfsStorage) - } } @@ -103,7 +102,7 @@ func NewNoopStorageProvider( func NewStandardExecutorProvider( cfg types.EngineConfig, executorOptions StandardExecutorOptions, -) (executor.ExecutorProvider, error) { +) (executor.ExecProvider, error) { providers := make(map[string]executor.Executor) if cfg.IsNotDisabled(models.EngineDocker) { @@ -126,7 +125,7 @@ func NewStandardExecutorProvider( } // return noop executors for all engines -func NewNoopExecutors(config noop_executor.ExecutorConfig) executor.ExecutorProvider { +func NewNoopExecutors(config noop_executor.ExecutorConfig) executor.ExecProvider { noopExecutor := noop_executor.NewNoopExecutorWithConfig(config) return provider.NewNoopProvider[executor.Executor](noopExecutor) } diff --git a/pkg/executor/wasm/util/logger/logmanager.go b/pkg/executor/wasm/util/logger/logmanager.go index 23a0929a9b..341e66fb90 100644 --- a/pkg/executor/wasm/util/logger/logmanager.go +++ b/pkg/executor/wasm/util/logger/logmanager.go @@ -227,7 +227,7 @@ func (lm *LogManager) Close() { lm.wg.Wait() go func(ctx context.Context, executionID string, filename string) { - tensecs := time.After(time.Duration(10) * time.Second) //nolint:gomnd + tensecs := time.After(time.Duration(10) * time.Second) //nolint:mnd <-tensecs log.Ctx(ctx).Debug().Msgf("logmanager removing logfile for %s: %s", executionID, filename) diff --git a/pkg/executor/wasm/util/logger/logmessage.go b/pkg/executor/wasm/util/logger/logmessage.go index c638049a26..1791a3218e 100644 --- a/pkg/executor/wasm/util/logger/logmessage.go +++ b/pkg/executor/wasm/util/logger/logmessage.go @@ -4,6 +4,7 @@ import ( "bufio" "encoding/binary" "fmt" + "math" ) type LogStreamType int8 @@ -34,6 +35,9 @@ type LogMessage struct { // The next 8bytes are the timestamp // The next 4 bytes (n) are the size of the data // Then the remaining n bytes are the data +// TODO: Ignoring lint for now since fixing it is a task by itself. +// +//nolint:gosec // G115: intentionally converting int to uint32 for message size func (m *LogMessage) ToBytes() []byte { size := uint32(LogMessageHeaderLength + len(m.Data)) b := make([]byte, size) @@ -68,7 +72,12 @@ func (m *LogMessage) FromReader(reader bufio.Reader) error { m.Stream = LogStreamType(buffer[LogMessageStreamOffset]) - m.Timestamp = int64(binary.BigEndian.Uint64(buffer[LogMessageTimestampOffset:])) + timestamp := binary.BigEndian.Uint64(buffer[LogMessageTimestampOffset:]) + if timestamp > math.MaxInt64 { + return fmt.Errorf("timestamp value %d exceeds maximum int64 value", timestamp) + } + m.Timestamp = int64(timestamp) + m.Data = append([]byte(nil), buffer[LogMessageHeaderLength:]...) return nil diff --git a/pkg/ipfs/tree.go b/pkg/ipfs/tree.go index 49e5811675..222f3471ae 100644 --- a/pkg/ipfs/tree.go +++ b/pkg/ipfs/tree.go @@ -16,6 +16,7 @@ type IPLDTreeNode struct { func getTreeNode(ctx context.Context, navNode ipld.NavigableNode, path []string) (IPLDTreeNode, error) { var children []IPLDTreeNode for i, link := range navNode.GetIPLDNode().Links() { + //nolint:gosec // G115: Node links is always within reasonable bounds childNavNode, err := navNode.FetchChild(ctx, uint(i)) if err != nil { return IPLDTreeNode{}, err diff --git a/pkg/jobstore/boltdb/database.go b/pkg/jobstore/boltdb/database.go index 51b254b775..990f174b03 100644 --- a/pkg/jobstore/boltdb/database.go +++ b/pkg/jobstore/boltdb/database.go @@ -120,7 +120,8 @@ func BucketSequenceString(_ *bolt.Tx, bucket *bolt.Bucket) string { // uint64ToBytes converts an uint64 to a byte slice func uint64ToBytes(i uint64) []byte { - buf := make([]byte, 8) //nolint:gomnd + //nolint:mnd + buf := make([]byte, 8) binary.BigEndian.PutUint64(buf, i) return buf } diff --git a/pkg/jobstore/boltdb/errors.go b/pkg/jobstore/boltdb/errors.go index 9e53eb1537..f81246e38e 100644 --- a/pkg/jobstore/boltdb/errors.go +++ b/pkg/jobstore/boltdb/errors.go @@ -21,41 +21,41 @@ const ( BoltDBValueTooLarge bacerrors.ErrorCode = "BoltDBValueTooLarge" ) -func NewBoltDbError(err error) bacerrors.Error { +func NewBoltDBError(err error) bacerrors.Error { switch { case errors.Is(err, bbolt.ErrBucketNotFound): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(BoltDBBucketNotFound). WithHTTPStatusCode(http.StatusNotFound). WithComponent(BoltDBComponent) case errors.Is(err, bbolt.ErrBucketExists): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(BoltDBBucketExists). WithHTTPStatusCode(http.StatusConflict). WithComponent(BoltDBComponent) case errors.Is(err, bbolt.ErrTxNotWritable): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(BoltDBTxNotWritable). WithHTTPStatusCode(http.StatusInternalServerError). WithComponent(BoltDBComponent) case errors.Is(err, bbolt.ErrIncompatibleValue): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(BoltDBIncompatibleValue). WithComponent(BoltDBComponent) case errors.Is(err, bbolt.ErrKeyRequired): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(BoltDBKeyRequired). WithComponent(BoltDBComponent) case errors.Is(err, bbolt.ErrKeyTooLarge): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(BoltDBKeyTooLarge). WithComponent(BoltDBComponent) case errors.Is(err, bbolt.ErrValueTooLarge): - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(BoltDBValueTooLarge). WithComponent(BoltDBComponent) default: - return bacerrors.New(err.Error()). + return bacerrors.New("%s", err). WithCode(bacerrors.BadRequestError). WithComponent(BoltDBComponent) } diff --git a/pkg/jobstore/boltdb/store.go b/pkg/jobstore/boltdb/store.go index 4d5865327e..2ea8e9bdb6 100644 --- a/pkg/jobstore/boltdb/store.go +++ b/pkg/jobstore/boltdb/store.go @@ -167,7 +167,6 @@ func (b *BoltJobStore) GetJob(ctx context.Context, id string) (models.Job, error } func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) { - var job models.Job jobID, err := b.reifyJobID(tx, jobID) @@ -190,7 +189,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) { if idgen.ShortUUID(jobID) == jobID { bktJobs, err := NewBucketPath(BucketJobs).Get(tx, false) if err != nil { - return "", NewBoltDbError(err) + return "", NewBoltDBError(err) } found := make([]string, 0, 1) @@ -224,7 +223,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e } if bkt, err := NewBucketPath(BucketJobs, key, BucketJobExecutions).Get(tx, false); err != nil { - return exec, NewBoltDbError(err) + return exec, NewBoltDBError(err) } else { data := bkt.Get([]byte(id)) if data == nil { @@ -243,7 +242,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e func (b *BoltJobStore) getExecutionJobID(tx *bolt.Tx, id string) (string, error) { keys, err := b.executionsIndex.List(tx, []byte(id)) if err != nil { - return "", NewBoltDbError(err) + return "", NewBoltDBError(err) } if len(keys) != 1 { @@ -298,7 +297,7 @@ func (b *BoltJobStore) getExecutions(tx *bolt.Tx, options jobstore.GetExecutions bkt, err := NewBucketPath(BucketJobs, jobID, BucketJobExecutions).Get(tx, false) if err != nil { - return nil, NewBoltDbError(err) + return nil, NewBoltDBError(err) } var execs []models.Execution @@ -422,7 +421,7 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) ( if query.ReturnAll || query.Namespace == "" { bkt, err := NewBucketPath(BucketJobs).Get(tx, false) if err != nil { - return nil, NewBoltDbError(err) + return nil, NewBoltDBError(err) } err = bkt.ForEachBucket(func(k []byte) error { @@ -430,12 +429,12 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) ( return nil }) if err != nil { - return nil, NewBoltDbError(err) + return nil, NewBoltDBError(err) } } else { ids, err := b.namespacesIndex.List(tx, []byte(query.Namespace)) if err != nil { - return nil, NewBoltDbError(err) + return nil, NewBoltDBError(err) } for _, k := range ids { @@ -456,7 +455,7 @@ func (b *BoltJobStore) getJobsIncludeTags(tx *bolt.Tx, jobSet map[string]struct{ tagLabel := []byte(strings.ToLower(tag)) ids, err := b.tagsIndex.List(tx, tagLabel) if err != nil { - return nil, NewBoltDbError(err) + return nil, NewBoltDBError(err) } for _, k := range ids { @@ -484,7 +483,7 @@ func (b *BoltJobStore) getJobsExcludeTags(tx *bolt.Tx, jobSet map[string]struct{ tagLabel := []byte(strings.ToLower(tag)) ids, err := b.tagsIndex.List(tx, tagLabel) if err != nil { - return nil, NewBoltDbError(err) + return nil, NewBoltDBError(err) } for _, k := range ids { @@ -516,6 +515,9 @@ func (b *BoltJobStore) getJobsBuildList(tx *bolt.Tx, jobSet map[string]struct{}, return result, nil } +// TODO: Ignoring linting for now. Fixing this is a task by itself +// +//nolint:gosec // G115: slice length is always non-negative in Go func (b *BoltJobStore) getJobsWithinLimit(jobs []models.Job, query jobstore.JobQuery) ([]models.Job, bool) { if query.Offset >= uint64(len(jobs)) { return []models.Job{}, false @@ -585,7 +587,7 @@ func (b *BoltJobStore) getInProgressJobs(tx *bolt.Tx, jobType string) ([]models. keys, err := b.inProgressIndex.List(tx) if err != nil { - return nil, NewBoltDbError(err) + return nil, NewBoltDBError(err) } for _, jobIDKey := range keys { @@ -669,7 +671,7 @@ func (b *BoltJobStore) getJobHistory(tx *bolt.Tx, jobID string, query jobstore.J if errors.Is(err, bolt.ErrBucketNotFound) { return &jobstore.JobHistoryQueryResponse{}, nil } - return nil, NewBoltDbError(err) + return nil, NewBoltDBError(err) } var history []models.JobHistory @@ -687,6 +689,7 @@ func (b *BoltJobStore) getJobHistory(tx *bolt.Tx, jobID string, query jobstore.J lastSeq = bytesToUint64(k) } + //nolint:gosec // G115: history within reasonable bounds if uint32(len(history)) == limit { break } @@ -718,8 +721,9 @@ func (b *BoltJobStore) getJobHistory(tx *bolt.Tx, jobID string, query jobstore.J } func (b *BoltJobStore) parseHistoryPaginationParams(query jobstore.JobHistoryQuery) (uint64, uint32, error) { + const defaultTokenLimit = 100 offset := uint64(0) - limit := uint32(100) + limit := uint32(defaultTokenLimit) if query.NextToken != "" { token, err := models.NewPagingTokenFromString(query.NextToken) @@ -757,7 +761,12 @@ func (b *BoltJobStore) filterHistoryItem(item models.JobHistory, query jobstore. return true } -func (b *BoltJobStore) shouldContinueHistoryPagination(tx *bolt.Tx, jobID string, cursor *bolt.Cursor, query jobstore.JobHistoryQuery) (bool, error) { +func (b *BoltJobStore) shouldContinueHistoryPagination( + tx *bolt.Tx, + jobID string, + cursor *bolt.Cursor, + query jobstore.JobHistoryQuery, +) (bool, error) { // If there are more items in the bucket, then we should continue if k, _ := cursor.Next(); k != nil { return true, nil @@ -809,7 +818,7 @@ func (b *BoltJobStore) update(ctx context.Context, update func(tx *bolt.Tx) erro tx, externalTx = txFromContext(ctx) if externalTx { if !tx.Writable() { - return NewBoltDbError(errors.New("readonly transaction provided in context for update operation")) + return NewBoltDBError(errors.New("readonly transaction provided in context for update operation")) } } else { tx, err = b.database.Begin(true) @@ -849,7 +858,7 @@ func (b *BoltJobStore) view(ctx context.Context, view func(tx *bolt.Tx) error) e if !externalTx { tx, err = b.database.Begin(false) if err != nil { - return NewBoltDbError(err) + return NewBoltDBError(err) } } @@ -876,17 +885,17 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error { jobIDKey := []byte(job.ID) if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, true); err != nil { - return NewBoltDbError(err) + return NewBoltDBError(err) } else { // Create the evaluations and executions buckets and so forth if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobExecutions)); err != nil { return err } if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobEvaluations)); err != nil { - return NewBoltDbError(err) + return NewBoltDBError(err) } if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobHistory)); err != nil { - return NewBoltDbError(err) + return NewBoltDBError(err) } } @@ -897,7 +906,7 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error { } if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, false); err != nil { - return NewBoltDbError(err) + return NewBoltDBError(err) } else { if err = bkt.Put(SpecKey, jobData); err != nil { return err @@ -907,11 +916,11 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error { // Create a composite key for the in progress index jobkey := createInProgressIndexKey(&job) if err = b.inProgressIndex.Add(tx, []byte(jobkey)); err != nil { - return NewBoltDbError(err) + return NewBoltDBError(err) } if err = b.namespacesIndex.Add(tx, jobIDKey, []byte(job.Namespace)); err != nil { - return NewBoltDbError(err) + return NewBoltDBError(err) } // Write sentinels keys for specific tags @@ -940,7 +949,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error { if bacerrors.IsError(err) { return err } - return NewBoltDbError(err) + return NewBoltDBError(err) } tx.OnCommit(func() { @@ -949,7 +958,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error { // Delete the Job bucket (and everything within it) if bkt, err := NewBucketPath(BucketJobs).Get(tx, false); err != nil { - return NewBoltDbError(err) + return NewBoltDBError(err) } else { if err = bkt.DeleteBucket([]byte(jobID)); err != nil { return err diff --git a/pkg/jobstore/errors.go b/pkg/jobstore/errors.go index e747c385c9..e29d55ea1c 100644 --- a/pkg/jobstore/errors.go +++ b/pkg/jobstore/errors.go @@ -50,14 +50,14 @@ func NewErrInvalidJobState(id string, actual models.JobStateType, expected model func NewErrInvalidJobVersion(id string, actual, expected uint64) bacerrors.Error { errorMessage := fmt.Sprintf("job %s has version %d but expected %d", id, actual, expected) - return bacerrors.New(errorMessage). + return bacerrors.New("%s", errorMessage). WithCode(ConflictJobVersion). WithComponent(JobStoreComponent) } func NewErrJobAlreadyTerminal(id string, actual models.JobStateType, newState models.JobStateType) bacerrors.Error { errorMessage := fmt.Sprintf("job %s is in terminal state %s and cannot transition to %s", id, actual, newState) - return bacerrors.New(errorMessage). + return bacerrors.New("%s", errorMessage). WithCode(ConflictJobState). WithComponent(JobStoreComponent) } @@ -88,7 +88,7 @@ func NewErrInvalidExecutionState(id string, actual models.ExecutionStateType, ex } else { errorMessage = fmt.Sprintf("execution %s is in state %s, but expected %s", id, actual, expected) } - return bacerrors.New(errorMessage). + return bacerrors.New("%s", errorMessage). WithCode(ConflictJobState). WithComponent(JobStoreComponent) } @@ -125,13 +125,13 @@ func NewErrMultipleEvaluationsFound(id string) bacerrors.Error { } func NewJobStoreError(message string) bacerrors.Error { - return bacerrors.New(message). + return bacerrors.New("%s", message). WithCode(bacerrors.BadRequestError). WithComponent(JobStoreComponent) } func NewBadRequestError(message string) bacerrors.Error { - return bacerrors.New(message). + return bacerrors.New("%s", message). WithCode(bacerrors.BadRequestError). WithComponent(JobStoreComponent) } diff --git a/pkg/lib/gzip/gzip.go b/pkg/lib/gzip/gzip.go index 63c335ae07..d19b905ca5 100644 --- a/pkg/lib/gzip/gzip.go +++ b/pkg/lib/gzip/gzip.go @@ -97,6 +97,9 @@ func Decompress(tarGzPath, destDir string) error { // DecompressWithMaxBytes takes the path to a .tar.gz file and decompresses it into the specified directory. // It enforces a maximum decompressed file size (per file) to prevent decompression bombs. +// TODO: Ignore linting issue for now. Fixing this is a task by itself +// +//nolint:gosec // G115: tar header mode is always within valid os.FileMode range (12 bits) func DecompressWithMaxBytes(tarGzPath, destDir string, maxDecompressSize int64) error { // Open the tar.gz file for reading. file, err := os.Open(tarGzPath) diff --git a/pkg/lib/watcher/boltdb/types.go b/pkg/lib/watcher/boltdb/types.go index 27d1ce5cb6..6c78d30799 100644 --- a/pkg/lib/watcher/boltdb/types.go +++ b/pkg/lib/watcher/boltdb/types.go @@ -23,6 +23,7 @@ func newEventKey(seqNum uint64, timestamp int64) *eventKey { } } +//nolint:gosec // G115: limits within reasonable bounds func (k *eventKey) MarshalBinary() ([]byte, error) { buf := make([]byte, seqNumBytes+timestampBytes) binary.BigEndian.PutUint64(buf[:seqNumBytes], k.SeqNum) @@ -30,6 +31,7 @@ func (k *eventKey) MarshalBinary() ([]byte, error) { return buf, nil } +//nolint:gosec // G115: limits within reasonable bounds func (k *eventKey) UnmarshalBinary(data []byte) error { if len(data) != seqNumBytes+timestampBytes { return errors.New("invalid event key length") diff --git a/pkg/logger/dataframe.go b/pkg/logger/dataframe.go index 29a8509b12..7e9d00ab5c 100644 --- a/pkg/logger/dataframe.go +++ b/pkg/logger/dataframe.go @@ -76,6 +76,8 @@ func NewDataFrameFromData(tag StreamTag, data []byte) *DataFrame { // ToBytes converts the data frame into a format suitable for // transmission across a Writer. +// +//nolint:gosec func (df DataFrame) ToBytes() []byte { output := make([]byte, HeaderLength+df.Size) binary.LittleEndian.PutUint32(output, uint32(df.Tag)) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index dc34a033c0..6d8aabf32e 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -42,7 +42,7 @@ var ( func ParseLogMode(s string) (LogMode, error) { lm := []LogMode{LogModeDefault, LogModeJSON, LogModeCmd} for _, logMode := range lm { - if strings.ToLower(s) == strings.ToLower(string(logMode)) { + if strings.EqualFold(s, string(logMode)) { return logMode, nil } } @@ -344,7 +344,7 @@ func marshalZapCoreLogLevel(level zapcore.Level) zerolog.Level { return zerolog.PanicLevel } -//nolint:gocyclo +//nolint:gocyclo,gosec func marshalZapCoreFields[T zerologFields[T]](fields []zapcore.Field, handler T) T { keyPrefix := "" diff --git a/pkg/models/resource.go b/pkg/models/resource.go index 793417c973..a4e5c313cd 100644 --- a/pkg/models/resource.go +++ b/pkg/models/resource.go @@ -158,6 +158,7 @@ func (r *Resources) Validate() error { if r.CPU < 0 { mErr = errors.Join(mErr, fmt.Errorf("invalid CPU value: %f", r.CPU)) } + //nolint:gosec // G115: GPU count should be always within reasonable bounds if len(r.GPUs) > int(r.GPU) { // It's not an error for the GPUs specified to be less than the number // given by the GPU field – that just signifies that either: diff --git a/pkg/nats/errors.go b/pkg/nats/errors.go index cabc130863..ee75ea8430 100644 --- a/pkg/nats/errors.go +++ b/pkg/nats/errors.go @@ -56,5 +56,4 @@ func interceptConnectionError(err error, servers string) error { WithComponent(transportClientComponent). WithCode(bacerrors.ConfigurationError) } - } diff --git a/pkg/nats/proxy/compute_proxy.go b/pkg/nats/proxy/compute_proxy.go index 97d2bebb32..0f715851e7 100644 --- a/pkg/nats/proxy/compute_proxy.go +++ b/pkg/nats/proxy/compute_proxy.go @@ -36,7 +36,7 @@ func NewComputeProxy(params ComputeProxyParams) (*ComputeProxy, error) { sc, err := stream.NewConsumerClient(stream.ConsumerClientParams{ Conn: params.Conn, Config: stream.StreamConsumerClientConfig{ - StreamCancellationBufferDuration: 5 * time.Second, //nolint:gomnd + StreamCancellationBufferDuration: 5 * time.Second, //nolint:mnd }, }) if err != nil { diff --git a/pkg/nats/util.go b/pkg/nats/util.go index a7d361fe14..b836eb955a 100644 --- a/pkg/nats/util.go +++ b/pkg/nats/util.go @@ -42,7 +42,6 @@ func RoutesFromStr(routesStr string, allowLocal bool) ([]*url.URL, error) { if !allowLocal { routeUrls, err = removeLocalAddresses(routeUrls) if err != nil { - return nil, errors.Wrap(err, "failed to remove local addresses from NATS routes. please ensure settings do not contain a local address.") //nolint:lll } } diff --git a/pkg/node/compute.go b/pkg/node/compute.go index fc1028cb86..a08194ef12 100644 --- a/pkg/node/compute.go +++ b/pkg/node/compute.go @@ -38,7 +38,7 @@ type Compute struct { LocalEndpoint compute.Endpoint Capacity capacity.Tracker ExecutionStore store.ExecutionStore - Executors executor.ExecutorProvider + Executors executor.ExecProvider Storages storage.StorageProvider Publishers publisher.PublisherProvider Bidder compute.Bidder @@ -305,7 +305,7 @@ func NewBidder( allocatedResources models.Resources, publishers publisher.PublisherProvider, storages storage.StorageProvider, - executors executor.ExecutorProvider, + executors executor.ExecProvider, executionStore store.ExecutionStore, computeCallback compute.Callback, bufferRunner *compute.ExecutorBuffer, diff --git a/pkg/node/factories.go b/pkg/node/factories.go index d10fa144b1..e64f0f168e 100644 --- a/pkg/node/factories.go +++ b/pkg/node/factories.go @@ -61,7 +61,7 @@ func NewStandardStorageProvidersFactory(cfg types.Bacalhau) StorageProvidersFact func NewStandardExecutorsFactory(cfg types.EngineConfig) ExecutorsFactory { return ExecutorsFactoryFunc( - func(ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error) { + func(ctx context.Context, nodeConfig NodeConfig) (executor.ExecProvider, error) { pr, err := executor_util.NewStandardExecutorProvider( cfg, executor_util.StandardExecutorOptions{ diff --git a/pkg/node/heartbeat/client.go b/pkg/node/heartbeat/client.go index 8ea039a708..64abdbaa5c 100644 --- a/pkg/node/heartbeat/client.go +++ b/pkg/node/heartbeat/client.go @@ -32,10 +32,9 @@ func (h *HeartbeatClient) SendHeartbeat(ctx context.Context, sequence uint64) er heartbeat := Heartbeat{NodeID: h.nodeID, Sequence: sequence} // Send the heartbeat to current and legacy topics - var err error message := ncl.NewMessage(heartbeat) - err = errors.Join(err, h.publisher.Publish(ctx, message)) - err = errors.Join(h.legacyPublisher.Publish(ctx, heartbeat)) + err := h.publisher.Publish(ctx, message) + err = errors.Join(err, h.legacyPublisher.Publish(ctx, heartbeat)) return err } diff --git a/pkg/node/ncl.go b/pkg/node/ncl.go index 03635d812f..eebd70c601 100644 --- a/pkg/node/ncl.go +++ b/pkg/node/ncl.go @@ -16,6 +16,7 @@ func CreateMessageSerDeRegistry() (*ncl.MessageSerDeRegistry, error) { ) return reg, err } + const HeartbeatTopicFormat = "bacalhau.global.compute.%s.out.heartbeat" // computeHeartbeatTopic returns the subject to publish heartbeat messages to. diff --git a/pkg/node/node.go b/pkg/node/node.go index 663c09e350..4cedc0d294 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -28,6 +28,11 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/version" ) +const ( + maxPortNumber = 65535 + minPortNumber = 0 +) + type FeatureConfig struct { Engines []string Publishers []string @@ -278,12 +283,22 @@ func createAPIServer(cfg NodeConfig, userKey *baccrypto.UserKey) (*publicapi.Ser return nil, err } + givenPortNumber := cfg.BacalhauConfig.API.Port + if givenPortNumber < minPortNumber { + return nil, fmt.Errorf("invalid negative port number: %d", cfg.BacalhauConfig.API.Port) + } + if givenPortNumber > maxPortNumber { + return nil, fmt.Errorf("port number %d exceeds maximum allowed value (65535)", cfg.BacalhauConfig.API.Port) + } + + safePortNumber := uint16(givenPortNumber) + serverVersion := version.Get() // public http api server serverParams := publicapi.ServerParams{ Router: echo.New(), Address: cfg.BacalhauConfig.API.Host, - Port: uint16(cfg.BacalhauConfig.API.Port), + Port: safePortNumber, HostID: cfg.NodeID, Config: publicapi.DefaultConfig(), // using default as we don't expose this config to the user Authorizer: authz.NewPolicyAuthorizer(authzPolicy, userKey.PublicKey(), cfg.NodeID), diff --git a/pkg/publicapi/api.go b/pkg/publicapi/api.go index de922cf1b6..12acac62d3 100644 --- a/pkg/publicapi/api.go +++ b/pkg/publicapi/api.go @@ -1,5 +1,6 @@ package publicapi +//nolint:lll // Documentation // @title Bacalhau API // @description This page is the reference of the Bacalhau REST API. Project docs are available at https://docs.bacalhau.org/. Find more information about Bacalhau at https://github.com/bacalhau-project/bacalhau. // @contact.name Bacalhau Team diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go index 07d62e0222..7b5e4b024b 100644 --- a/pkg/publicapi/apimodels/error.go +++ b/pkg/publicapi/apimodels/error.go @@ -125,7 +125,7 @@ func (e *APIError) ToBacError() bacerrors.Error { details = make(map[string]string) } details["request_id"] = e.RequestID - return bacerrors.New(e.Error()). + return bacerrors.New("%s", e.Error()). WithHTTPStatusCode(e.HTTPStatusCode). WithCode(bacerrors.Code(e.Code)). WithComponent(e.Component). diff --git a/pkg/publicapi/client/v2/client.go b/pkg/publicapi/client/v2/client.go index c4872876b8..7d14e0489d 100644 --- a/pkg/publicapi/client/v2/client.go +++ b/pkg/publicapi/client/v2/client.go @@ -308,7 +308,15 @@ func (c *httpClient) toHTTP(ctx context.Context, method, endpoint string, r *api return req, nil } -func (c *httpClient) interceptError(ctx context.Context, err error, resp *http.Response, method, endpoint string, r *apimodels.HTTPRequest) (bacErr bacerrors.Error) { +//nolint:funlen,gocyclo // TODO: This functions is complex and should be simplified +func (c *httpClient) interceptError( + ctx context.Context, + err error, + resp *http.Response, + method, + endpoint string, + r *apimodels.HTTPRequest, +) (bacErr bacerrors.Error) { // Avoid adding common attributes if the error is an API error var isAPIError bool @@ -405,11 +413,6 @@ func (c *httpClient) interceptError(ctx context.Context, err error, resp *http.R WithCode(bacerrors.TimeOutError). WithRetryable() } - if netErr.Temporary() { - return bacerrors.New("temporary network error"). - WithCode(bacerrors.NetworkFailure). - WithRetryable() - } // Check specifically for "connection refused" error if opErr, ok := netErr.(*net.OpError); ok && opErr.Op == "dial" { diff --git a/pkg/publicapi/client/v2/errors.go b/pkg/publicapi/client/v2/errors.go index 54d1dee929..433cc0982b 100644 --- a/pkg/publicapi/client/v2/errors.go +++ b/pkg/publicapi/client/v2/errors.go @@ -41,6 +41,8 @@ func (e UnexpectedResponseError) HasError() bool { return e.err != nil func (e UnexpectedResponseError) Unwrap() error { return e.err } func (e UnexpectedResponseError) HasAdditional() bool { return e.additional != nil } func (e UnexpectedResponseError) Additional() error { return e.additional } + +//nolint:unused // This function is kept for future use func newUnexpectedResponseError(src unexpectedResponseErrorSource, opts ...unexpectedResponseErrorOption) UnexpectedResponseError { nErr := src() for _, opt := range opts { @@ -82,27 +84,29 @@ func (e UnexpectedResponseError) Error() string { // UnexpectedResponseErrorOptions are functions passed to NewUnexpectedResponseError // to customize the created error. +// +//nolint:unused // This function is kept for future use type unexpectedResponseErrorOption func(*UnexpectedResponseError) // withError allows the addition of a Go error that may have been encountered // while processing the response. For example, if there is an error constructing // the gzip reader to process a gzip-encoded response body. // -//nolint:unused +//nolint:unused // This function is kept for future use func withError(e error) unexpectedResponseErrorOption { return func(u *UnexpectedResponseError) { u.err = e } } // withBody overwrites the Body value with the provided custom value // -//nolint:unused +//nolint:unused // This function is kept for future use func withBody(b string) unexpectedResponseErrorOption { return func(u *UnexpectedResponseError) { u.body = b } } // withStatusText overwrites the StatusText value the provided custom value // -//nolint:unused +//nolint:unused // This function is kept for future use func withStatusText(st string) unexpectedResponseErrorOption { return func(u *UnexpectedResponseError) { u.statusText = st } } @@ -110,15 +114,21 @@ func withStatusText(st string) unexpectedResponseErrorOption { // withExpectedStatuses provides a list of statuses that the receiving function // expected to receive. This can be used by API callers to provide more feedback // to end-users. +// +//nolint:unused // This function is kept for future use func withExpectedStatuses(s []int) unexpectedResponseErrorOption { return func(u *UnexpectedResponseError) { u.expected = slices.Clone(s) } } // unexpectedResponseErrorSource provides the basis for a NewUnexpectedResponseError. +// +//nolint:unused // This function is kept for future use type unexpectedResponseErrorSource func() *UnexpectedResponseError // fromHTTPResponse read an open HTTP response, drains and closes its body as // the data for the UnexpectedResponseError. +// +//nolint:unused // This function is kept for future use func fromHTTPResponse(resp *http.Response) unexpectedResponseErrorSource { return func() *UnexpectedResponseError { u := new(UnexpectedResponseError) @@ -147,22 +157,28 @@ func fromHTTPResponse(resp *http.Response) unexpectedResponseErrorSource { // the resolving function provided inside of the NewUnexpectedResponseError // implementation. // -//nolint:unused +//nolint:unused // This function is kept for future use func fromStatusCode(sc int) unexpectedResponseErrorSource { return func() *UnexpectedResponseError { return &UnexpectedResponseError{statusCode: sc} } } // doRequestWrapper is a function that wraps the client's doRequest method // and can be used to provide error and response handling +// +//nolint:unused // This function is kept for future use type doRequestWrapper = func(time.Duration, *http.Response, error) (time.Duration, *http.Response, error) // requireOK is used to wrap doRequest and check for a 200 +// +//nolint:unused // This function is kept for future use func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) { return requireStatusIn(http.StatusOK)(d, resp, e) } // requireStatusIn is a doRequestWrapper generator that takes expected HTTP // response codes and validates that the received response code is among them +// +//nolint:unused // This function is kept for future use func requireStatusIn(statuses ...int) doRequestWrapper { return func(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) { for _, status := range statuses { diff --git a/pkg/publicapi/middleware/error_handler.go b/pkg/publicapi/middleware/error_handler.go index 7cc45a2266..c8349ccc32 100644 --- a/pkg/publicapi/middleware/error_handler.go +++ b/pkg/publicapi/middleware/error_handler.go @@ -19,7 +19,6 @@ func CustomHTTPErrorHandler(err error, c echo.Context) { var apiError *apimodels.APIError switch e := err.(type) { - case bacerrors.Error: apiError = apimodels.FromBacError(e) diff --git a/pkg/publicapi/server.go b/pkg/publicapi/server.go index a87290bb71..713f87116c 100644 --- a/pkg/publicapi/server.go +++ b/pkg/publicapi/server.go @@ -208,6 +208,7 @@ func (apiServer *Server) ListenAndServe(ctx context.Context) error { if apiServer.Port == 0 { switch addr := listener.Addr().(type) { case *net.TCPAddr: + //nolint:gosec // G115: addr.Port should be within limit apiServer.Port = uint16(addr.Port) default: return fmt.Errorf("unknown address %v", addr) diff --git a/pkg/repo/migrations/v3_4.go b/pkg/repo/migrations/v3_4.go index 23bd35065b..cfe3b731d8 100644 --- a/pkg/repo/migrations/v3_4.go +++ b/pkg/repo/migrations/v3_4.go @@ -36,6 +36,9 @@ import ( // - If a user has configured a custom auth tokens path, the configured value is copied to .bacalhau/tokens.json. var V3Migration = V3MigrationWithConfig(system.DefaultGlobalConfig) +// V3MigrationWithConfig TODO: This is a very complex function that should be simplified and split +// +//nolint:gocyclo func V3MigrationWithConfig(globalCfg system.GlobalConfig) repo.Migration { return repo.NewMigration( repo.Version3, @@ -128,7 +131,6 @@ func V3MigrationWithConfig(globalCfg system.GlobalConfig) repo.Migration { // if there was an error other than the file not existing, abort. return fmt.Errorf("failed to read config file %s while migrating: %w", oldConfigFilePath, err) } - } return nil }, diff --git a/pkg/s3/errors.go b/pkg/s3/errors.go index cbeaee1b04..fa9493571e 100644 --- a/pkg/s3/errors.go +++ b/pkg/s3/errors.go @@ -23,19 +23,19 @@ const ( ) func NewS3PublisherError(code bacerrors.ErrorCode, message string) bacerrors.Error { - return bacerrors.New(message). + return bacerrors.New("%s", message). WithCode(code). WithComponent(PublisherComponent) } func NewS3InputSourceError(code bacerrors.ErrorCode, message string) bacerrors.Error { - return bacerrors.New(message). + return bacerrors.New("%s", message). WithCode(code). WithComponent(InputSourceComponent) } func NewS3DownloaderError(code bacerrors.ErrorCode, message string) bacerrors.Error { - return bacerrors.New(message). + return bacerrors.New("%s", message). WithCode(code). WithComponent(DownloadComponent) } @@ -54,7 +54,7 @@ func NewS3ResultSignerServiceError(err error) bacerrors.Error { func newS3ServiceError(err error, component string) bacerrors.Error { errMetadata := extractErrorMetadata(err) - return bacerrors.New(errMetadata.message). + return bacerrors.New("%s", errMetadata.message). WithComponent(component). WithCode(errMetadata.errorCode). WithHTTPStatusCode(errMetadata.statusCode). diff --git a/pkg/s3/types.go b/pkg/s3/types.go index 5c38896d5a..26b49e9d67 100644 --- a/pkg/s3/types.go +++ b/pkg/s3/types.go @@ -48,7 +48,10 @@ func (c PreSignedResultSpec) ToMap() map[string]interface{} { func DecodeSourceSpec(spec *models.SpecConfig) (SourceSpec, error) { if !spec.IsType(models.StorageSourceS3) { - return SourceSpec{}, NewS3InputSourceError(BadRequestErrorCode, "invalid storage source type. expected "+models.StorageSourceS3+" but received: "+spec.Type) + return SourceSpec{}, NewS3InputSourceError( + BadRequestErrorCode, + fmt.Sprintf("invalid storage source type. expected %s but received: %s", models.StorageSourceS3, spec.Type), + ) } inputParams := spec.Params if inputParams == nil { diff --git a/pkg/storage/inline/storage.go b/pkg/storage/inline/storage.go index 75ca770ff5..add5df7d07 100644 --- a/pkg/storage/inline/storage.go +++ b/pkg/storage/inline/storage.go @@ -149,6 +149,7 @@ func (*InlineStorage) Upload(ctx context.Context, path string) (models.SpecConfi } var url string + //nolint:gosec // G115: maximumPlaintextSize is always within reasonable bounds if info.IsDir() || info.Size() > int64(maximumPlaintextSize.Bytes()) { cwd, err := os.Getwd() if err != nil { diff --git a/pkg/storage/s3/storage.go b/pkg/storage/s3/storage.go index 4733f311cb..6aa79e3fc4 100644 --- a/pkg/storage/s3/storage.go +++ b/pkg/storage/s3/storage.go @@ -3,6 +3,7 @@ package s3 import ( "context" "fmt" + "math" "os" "path/filepath" "regexp" @@ -83,8 +84,21 @@ func (s *StorageProvider) GetVolumeSize(ctx context.Context, volume models.Input if err != nil { return 0, err } - size := uint64(0) + var size uint64 for _, object := range objects { + // Check for negative size + if object.size < 0 { + return 0, fmt.Errorf("invalid negative size for object: %d", object.size) + } + + // Check for overflow + // MaxUint64 - size = remaining space before overflow + //nolint:gosec // G115: negative values already checked + if object.size > 0 && uint64(object.size) > math.MaxUint64-size { + return 0, fmt.Errorf("total size exceeds uint64 maximum") + } + + //nolint:gosec // G115: Already checked above size += uint64(object.size) } return size, nil diff --git a/pkg/storage/s3/types.go b/pkg/storage/s3/types.go index 9a5eef2086..5c6b52b4c7 100644 --- a/pkg/storage/s3/types.go +++ b/pkg/storage/s3/types.go @@ -2,6 +2,7 @@ package s3 import ( "errors" + "fmt" "github.com/fatih/structs" "github.com/mitchellh/mapstructure" @@ -33,7 +34,13 @@ func (c SourceSpec) ToMap() map[string]interface{} { func DecodeSourceSpec(spec *models.SpecConfig) (SourceSpec, error) { if !spec.IsType(models.StorageSourceS3) { - return SourceSpec{}, s3.NewS3InputSourceError(s3.BadRequestErrorCode, "invalid storage source type. expected "+models.StorageSourceS3+", but received: "+spec.Type) + return SourceSpec{}, + s3.NewS3InputSourceError( + s3.BadRequestErrorCode, + fmt.Sprintf("invalid storage source type. expected %s, but received: %s", + models.StorageSourceS3, spec.Type, + ), + ) } inputParams := spec.Params if inputParams == nil { @@ -66,7 +73,13 @@ func (c PreSignedResultSpec) ToMap() map[string]interface{} { func DecodePreSignedResultSpec(spec *models.SpecConfig) (PreSignedResultSpec, error) { if !spec.IsType(models.StorageSourceS3PreSigned) { - return PreSignedResultSpec{}, s3.NewS3InputSourceError(s3.BadRequestErrorCode, "invalid storage source type. expected "+models.StorageSourceS3PreSigned+", but received: "+spec.Type) + return PreSignedResultSpec{}, + s3.NewS3InputSourceError( + s3.BadRequestErrorCode, + fmt.Sprintf("invalid storage source type. expected %s, but received: %s", + models.StorageSourceS3PreSigned, spec.Type, + ), + ) } inputParams := spec.Params diff --git a/pkg/storage/url/urldownload/storage.go b/pkg/storage/url/urldownload/storage.go index df58f3efb3..bf6941671c 100644 --- a/pkg/storage/url/urldownload/storage.go +++ b/pkg/storage/url/urldownload/storage.go @@ -112,6 +112,7 @@ func (sp *StorageProvider) GetVolumeSize(ctx context.Context, storageSpec models return 0, ErrNoContentLengthFound } + //nolint:gosec // G115: negative memory values already checked above return uint64(res.ContentLength), nil } diff --git a/pkg/test/compute/setup_test.go b/pkg/test/compute/setup_test.go index 141bc9a496..f57903ec1d 100644 --- a/pkg/test/compute/setup_test.go +++ b/pkg/test/compute/setup_test.go @@ -83,7 +83,7 @@ func (s *ComputeSuite) setupConfig() { return provider.NewNoopProvider[storage.Storage](noop_storage.NewNoopStorage()), nil }), ExecutorsFactory: node.ExecutorsFactoryFunc( - func(ctx context.Context, nodeConfig node.NodeConfig) (executor_common.ExecutorProvider, error) { + func(ctx context.Context, nodeConfig node.NodeConfig) (executor_common.ExecProvider, error) { return provider.NewMappedProvider(map[string]executor_common.Executor{ models.EngineNoop: s.executor, models.EngineDocker: dockerExecutor, diff --git a/pkg/test/scenario/test_scenarios.go b/pkg/test/scenario/test_scenarios.go index b0af89ea05..c24fe1e0ba 100644 --- a/pkg/test/scenario/test_scenarios.go +++ b/pkg/test/scenario/test_scenarios.go @@ -262,7 +262,7 @@ func WasmEnvVars(t testing.TB) Scenario { ResultsChecker: FileContains( "stdout", []string{"AWESOME=definitely", "TEST=yes"}, - 3, //nolint:gomnd // magic number appropriate for test + 3, //nolint:mnd // magic number appropriate for test ), Job: &models.Job{ Name: t.Name(), @@ -385,7 +385,7 @@ func WasmLogTest(t testing.TB) Scenario { ResultsChecker: FileContains( "stdout", []string{"https://www.gutenberg.org"}, // end of the file - -1, //nolint:gomnd // magic number appropriate for test + -1, //nolint:mnd // magic number appropriate for test ), Job: &models.Job{ Name: t.Name(), diff --git a/pkg/test/teststack/stack.go b/pkg/test/teststack/stack.go index a1495740c6..b21f897d9c 100644 --- a/pkg/test/teststack/stack.go +++ b/pkg/test/teststack/stack.go @@ -46,7 +46,7 @@ func Setup( } // Wait for nodes to have announced their presence. - //nolint:gomnd + //nolint:mnd require.Eventually(t, func() bool { return allNodesDiscovered(t, stack) @@ -102,7 +102,7 @@ type mixedExecutorFactory struct { func (m *mixedExecutorFactory) Get( ctx context.Context, nodeConfig node.NodeConfig, -) (executor.ExecutorProvider, error) { +) (executor.ExecProvider, error) { stdProvider, err := m.standardFactory.Get(ctx, nodeConfig) if err != nil { return nil, err diff --git a/pkg/test/wait/wait.go b/pkg/test/wait/wait.go index 018256f2ff..a2cfa57245 100644 --- a/pkg/test/wait/wait.go +++ b/pkg/test/wait/wait.go @@ -40,7 +40,9 @@ func For(params Params) error { time.Sleep(params.Backoff) } if lastErr == nil { - return fmt.Errorf("timed out after %v", time.Duration(params.Retries)*params.Backoff) + //nolint:gosec // G115: retry count is always within reasonable bounds + totalTime := params.Backoff * time.Duration(params.Retries) + return fmt.Errorf("timed out after %v", totalTime) } return lastErr } diff --git a/pkg/util/targzip/targzip.go b/pkg/util/targzip/targzip.go index 3526073088..608529c317 100644 --- a/pkg/util/targzip/targzip.go +++ b/pkg/util/targzip/targzip.go @@ -44,7 +44,18 @@ func UncompressedSize(src io.Reader) (datasize.ByteSize, error) { var header *tar.Header for header, err = tr.Next(); err == nil; header, err = tr.Next() { - size += datasize.ByteSize(header.Size) + // Check for negative size + if header.Size < 0 { + return 0, fmt.Errorf("invalid negative size in tar header: %d", header.Size) + } + + // Check for overflow before adding + //nolint:gosec // G115: negative values already checked + newSize := size + datasize.ByteSize(header.Size) + if newSize < size { // If newSize wrapped around + return 0, fmt.Errorf("total uncompressed size exceeds maximum value") + } + size = newSize } if err == io.EOF { err = nil @@ -54,7 +65,7 @@ func UncompressedSize(src io.Reader) (datasize.ByteSize, error) { // from https://github.com/mimoo/eureka/blob/master/folders.go under Apache 2 // -//nolint:gocyclo,funlen +//nolint:gocyclo,funlen,gosec func compress(ctx context.Context, src string, buf io.Writer, max datasize.ByteSize, stripPath bool) error { _, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), "pkg/util/targzip.compress") defer span.End() @@ -160,6 +171,7 @@ func compress(ctx context.Context, src string, buf io.Writer, max datasize.ByteS return nil } +//nolint:gosec // G115: limits within reasonable bounds func decompress(src io.Reader, dst string, max datasize.ByteSize) error { // ensure destination directory exists err := os.Mkdir(dst, worldReadOwnerWritePermission) @@ -218,7 +230,7 @@ func decompress(src io.Reader, dst string, max datasize.ByteSize) error { return err } // copy over contents (max 10MB per file!) - if _, err := io.CopyN(fileToWrite, tr, int64(max)); err != nil { //nolint:gomnd + if _, err := io.CopyN(fileToWrite, tr, int64(max)); err != nil { //nolint:mnd // io.EOF is expected if err != io.EOF { return err diff --git a/pkg/version/version.go b/pkg/version/version.go index 04a1d45f4a..3d551cea88 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -57,8 +57,8 @@ func Get() *models.BuildVersionInfo { } versionInfo := &models.BuildVersionInfo{ - Major: strconv.FormatInt(s.Major(), 10), //nolint:gomnd // base10, magic number appropriate - Minor: strconv.FormatInt(s.Minor(), 10), //nolint:gomnd // base10, magic number appropriate + Major: strconv.FormatInt(s.Major(), 10), //nolint:mnd // base10, magic number appropriate + Minor: strconv.FormatInt(s.Minor(), 10), //nolint:mnd // base10, magic number appropriate GitVersion: GITVERSION, GitCommit: revision, BuildDate: revisionTime,