diff --git a/pkg/jobstore/boltdb/errors.go b/pkg/jobstore/boltdb/errors.go new file mode 100644 index 0000000000..21f2dddc37 --- /dev/null +++ b/pkg/jobstore/boltdb/errors.go @@ -0,0 +1,31 @@ +package boltjobstore + +import ( + "errors" + + "github.com/bacalhau-project/bacalhau/pkg/models" + "go.etcd.io/bbolt" +) + +const BOLTDB_COMPONENT = "BDB" + +func NewBoltDbError(err error) *models.BaseError { + switch { + case errors.Is(err, bbolt.ErrBucketNotFound): + return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 404)) + case errors.Is(err, bbolt.ErrBucketExists): + return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 409)) + case errors.Is(err, bbolt.ErrTxNotWritable): + return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500)) + case errors.Is(err, bbolt.ErrIncompatibleValue): + return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500)) + case errors.Is(err, bbolt.ErrKeyRequired): + return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500)) + case errors.Is(err, bbolt.ErrKeyTooLarge): + return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500)) + case errors.Is(err, bbolt.ErrValueTooLarge): + return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500)) + default: + return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500)) + } +} diff --git a/pkg/jobstore/boltdb/store.go b/pkg/jobstore/boltdb/store.go index 7a9836a585..e330969516 100644 --- a/pkg/jobstore/boltdb/store.go +++ b/pkg/jobstore/boltdb/store.go @@ -3,6 +3,7 @@ package boltjobstore import ( "bytes" "context" + "errors" "fmt" "slices" "sort" @@ -191,7 +192,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) { if idgen.ShortUUID(jobID) == jobID { bktJobs, err := NewBucketPath(BucketJobs).Get(tx, false) if err != nil { - return "", jobstore.NewBoltDbError(err.Error()) + return "", NewBoltDbError(err) } found := make([]string, 0, 1) @@ -225,7 +226,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e } if bkt, err := NewBucketPath(BucketJobs, key, BucketJobExecutions).Get(tx, false); err != nil { - return exec, jobstore.NewBoltDbError(err.Error()) + return exec, NewBoltDbError(err) } else { data := bkt.Get([]byte(id)) if data == nil { @@ -244,7 +245,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e func (b *BoltJobStore) getExecutionJobID(tx *bolt.Tx, id string) (string, error) { keys, err := b.executionsIndex.List(tx, []byte(id)) if err != nil { - return "", jobstore.NewBoltDbError(err.Error()) + return "", NewBoltDbError(err) } if len(keys) != 1 { @@ -299,7 +300,7 @@ func (b *BoltJobStore) getExecutions(tx *bolt.Tx, options jobstore.GetExecutions bkt, err := NewBucketPath(BucketJobs, jobID, BucketJobExecutions).Get(tx, false) if err != nil { - return nil, jobstore.NewBoltDbError(err.Error()) + return nil, NewBoltDbError(err) } var execs []models.Execution @@ -423,7 +424,7 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) ( if query.ReturnAll || query.Namespace == "" { bkt, err := NewBucketPath(BucketJobs).Get(tx, false) if err != nil { - return nil, jobstore.NewBoltDbError(err.Error()) + return nil, NewBoltDbError(err) } err = bkt.ForEachBucket(func(k []byte) error { @@ -431,12 +432,12 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) ( return nil }) if err != nil { - return nil, jobstore.NewBoltDbError(err.Error()) + return nil, NewBoltDbError(err) } } else { ids, err := b.namespacesIndex.List(tx, []byte(query.Namespace)) if err != nil { - return nil, jobstore.NewBoltDbError(err.Error()) + return nil, NewBoltDbError(err) } for _, k := range ids { @@ -457,7 +458,7 @@ func (b *BoltJobStore) getJobsIncludeTags(tx *bolt.Tx, jobSet map[string]struct{ tagLabel := []byte(strings.ToLower(tag)) ids, err := b.tagsIndex.List(tx, tagLabel) if err != nil { - return nil, jobstore.NewBoltDbError(err.Error()) + return nil, NewBoltDbError(err) } for _, k := range ids { @@ -485,7 +486,7 @@ func (b *BoltJobStore) getJobsExcludeTags(tx *bolt.Tx, jobSet map[string]struct{ tagLabel := []byte(strings.ToLower(tag)) ids, err := b.tagsIndex.List(tx, tagLabel) if err != nil { - return nil, jobstore.NewBoltDbError(err.Error()) + return nil, NewBoltDbError(err) } for _, k := range ids { @@ -586,7 +587,7 @@ func (b *BoltJobStore) getInProgressJobs(tx *bolt.Tx, jobType string) ([]models. keys, err := b.inProgressIndex.List(tx) if err != nil { - return nil, jobstore.NewBoltDbError(err.Error()) + return nil, NewBoltDbError(err) } for _, jobIDKey := range keys { @@ -779,7 +780,7 @@ func (b *BoltJobStore) update(ctx context.Context, update func(tx *bolt.Tx) erro tx, externalTx = txFromContext(ctx) if externalTx { if !tx.Writable() { - return jobstore.NewBoltDbError("readonly transaction provided in context for update operation") + return NewBoltDbError(errors.New("readonly transaction provided in context for update operation")) } } else { tx, err = b.database.Begin(true) @@ -819,7 +820,7 @@ func (b *BoltJobStore) view(ctx context.Context, view func(tx *bolt.Tx) error) e if !externalTx { tx, err = b.database.Begin(false) if err != nil { - return jobstore.NewBoltDbError(err.Error()) + return NewBoltDbError(err) } } @@ -846,21 +847,21 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error { jobIDKey := []byte(job.ID) if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, true); err != nil { - return jobstore.NewBoltDbError(err.Error()) + return NewBoltDbError(err) } else { // Create the evaluations and executions buckets and so forth if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobExecutions)); err != nil { return err } if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobEvaluations)); err != nil { - return jobstore.NewBoltDbError(err.Error()) + return NewBoltDbError(err) } if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobHistory)); err != nil { - return jobstore.NewBoltDbError(err.Error()) + return NewBoltDbError(err) } if _, err := bkt.CreateBucketIfNotExists([]byte(BucketExecutionHistory)); err != nil { - return jobstore.NewBoltDbError(err.Error()) + return NewBoltDbError(err) } } @@ -871,7 +872,7 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error { } if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, false); err != nil { - return jobstore.NewBoltDbError(err.Error()) + return NewBoltDbError(err) } else { if err = bkt.Put(SpecKey, jobData); err != nil { return err @@ -881,11 +882,11 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error { // Create a composite key for the in progress index jobkey := createInProgressIndexKey(&job) if err = b.inProgressIndex.Add(tx, []byte(jobkey)); err != nil { - return jobstore.NewBoltDbError(err.Error()) + return NewBoltDbError(err) } if err = b.namespacesIndex.Add(tx, jobIDKey, []byte(job.Namespace)); err != nil { - return jobstore.NewBoltDbError(err.Error()) + return NewBoltDbError(err) } // Write sentinels keys for specific tags @@ -920,7 +921,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error { // Delete the Job bucket (and everything within it) if bkt, err := NewBucketPath(BucketJobs).Get(tx, false); err != nil { - return jobstore.NewBoltDbError(err.Error()) + return NewBoltDbError(err) } else { if err = bkt.DeleteBucket([]byte(jobID)); err != nil { return err diff --git a/pkg/jobstore/errors.go b/pkg/jobstore/errors.go index 70aea00cd7..f0b2d52db3 100644 --- a/pkg/jobstore/errors.go +++ b/pkg/jobstore/errors.go @@ -8,7 +8,6 @@ import ( const ( JOB_STORE_COMPONENT = "JBS" - BOLTDB_COMPONENT = "BDB" ) func NewErrJobNotFound(id string) *models.BaseError { @@ -71,10 +70,6 @@ func NewErrExecutionAlreadyTerminal(id string, actual models.ExecutionStateType, return models.NewBaseError("execution %s is in terminal state %s and cannot transition to %s", id, actual, newState) } -func NewBoltDbError(message string) *models.BaseError { - return models.NewBaseError(message).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500)) -} - func NewJobStoreError(message string) *models.BaseError { return models.NewBaseError(message) } diff --git a/pkg/orchestrator/endpoint.go b/pkg/orchestrator/endpoint.go index 1d9893d7ea..9a2bb9b2cb 100644 --- a/pkg/orchestrator/endpoint.go +++ b/pkg/orchestrator/endpoint.go @@ -155,7 +155,7 @@ func (e *BaseEndpoint) StopJob(ctx context.Context, request *StopJobRequest) (St txContext, err := e.store.BeginTx(ctx) if err != nil { - return StopJobResponse{}, jobstore.NewBoltDbError(err.Error()) + return StopJobResponse{}, jobstore.NewJobStoreError(err.Error()) } defer func() { diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go index 5a3262e21c..236523a006 100644 --- a/pkg/publicapi/apimodels/error.go +++ b/pkg/publicapi/apimodels/error.go @@ -30,14 +30,14 @@ import ( type APIError struct { // httpstatuscode is the http status code associated with this error. // it should correspond to standard http status codes (e.g., 400, 404, 500). - HTTPStatusCode int `json:"code"` + HTTPStatusCode int `json:"-"` // message is a short, human-readable description of the error. // it should be concise and provide a clear indication of what went wrong. Message string `json:"message"` // RequestID is the request ID of the request that caused the error. - RequestID string `json:"request_id"` + RequestID string `json:"-"` } // NewAPIError creates a new APIError with the given HTTP status code and message. diff --git a/pkg/publicapi/middleware/error_handler.go b/pkg/publicapi/middleware/error_handler.go index e9c160fbf4..6631f54312 100644 --- a/pkg/publicapi/middleware/error_handler.go +++ b/pkg/publicapi/middleware/error_handler.go @@ -2,7 +2,6 @@ package middleware import ( "net/http" - "os" "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" @@ -37,7 +36,7 @@ func CustomHTTPErrorHandler(err error, c echo.Context) { message = "internal server error" code = c.Response().Status - if isDebugMode() { + if c.Echo().Debug { message = err.Error() } } @@ -62,7 +61,3 @@ func CustomHTTPErrorHandler(err error, c echo.Context) { } } - -func isDebugMode() bool { - return os.Getenv("BACALHAU_DEBUG") == "true" -} diff --git a/pkg/publicapi/server.go b/pkg/publicapi/server.go index 9178f590fe..f6fa6a5433 100644 --- a/pkg/publicapi/server.go +++ b/pkg/publicapi/server.go @@ -90,12 +90,13 @@ func NewAPIServer(params ServerParams) (*Server, error) { server.Router.Binder = NewNormalizeBinder() server.Router.Validator = NewCustomValidator() - // enable debug mode to get clearer error messages - // TODO: disable debug mode after we implement our own error handler - server.Router.Debug = true - // set middleware logLevel, err := zerolog.ParseLevel(params.Config.LogLevel) + if logLevel == zerolog.DebugLevel { + // enable debug mode to get clearer error messages + server.Router.Debug = true + } + if err != nil { return nil, err }