Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
udsamani committed Sep 11, 2024
1 parent 65ffc35 commit af6557a
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 38 deletions.
31 changes: 31 additions & 0 deletions pkg/jobstore/boltdb/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package boltjobstore

import (
"errors"

"github.com/bacalhau-project/bacalhau/pkg/models"
"go.etcd.io/bbolt"
)

const BOLTDB_COMPONENT = "BDB"

func NewBoltDbError(err error) *models.BaseError {
switch {
case errors.Is(err, bbolt.ErrBucketNotFound):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 404))
case errors.Is(err, bbolt.ErrBucketExists):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 409))
case errors.Is(err, bbolt.ErrTxNotWritable):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
case errors.Is(err, bbolt.ErrIncompatibleValue):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
case errors.Is(err, bbolt.ErrKeyRequired):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
case errors.Is(err, bbolt.ErrKeyTooLarge):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
case errors.Is(err, bbolt.ErrValueTooLarge):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
default:
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
}
}
41 changes: 21 additions & 20 deletions pkg/jobstore/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package boltjobstore
import (
"bytes"
"context"
"errors"
"fmt"
"slices"
"sort"
Expand Down Expand Up @@ -191,7 +192,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) {
if idgen.ShortUUID(jobID) == jobID {
bktJobs, err := NewBucketPath(BucketJobs).Get(tx, false)
if err != nil {
return "", jobstore.NewBoltDbError(err.Error())
return "", NewBoltDbError(err)
}

found := make([]string, 0, 1)
Expand Down Expand Up @@ -225,7 +226,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e
}

if bkt, err := NewBucketPath(BucketJobs, key, BucketJobExecutions).Get(tx, false); err != nil {
return exec, jobstore.NewBoltDbError(err.Error())
return exec, NewBoltDbError(err)
} else {
data := bkt.Get([]byte(id))
if data == nil {
Expand All @@ -244,7 +245,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e
func (b *BoltJobStore) getExecutionJobID(tx *bolt.Tx, id string) (string, error) {
keys, err := b.executionsIndex.List(tx, []byte(id))
if err != nil {
return "", jobstore.NewBoltDbError(err.Error())
return "", NewBoltDbError(err)
}

if len(keys) != 1 {
Expand Down Expand Up @@ -299,7 +300,7 @@ func (b *BoltJobStore) getExecutions(tx *bolt.Tx, options jobstore.GetExecutions

bkt, err := NewBucketPath(BucketJobs, jobID, BucketJobExecutions).Get(tx, false)
if err != nil {
return nil, jobstore.NewBoltDbError(err.Error())
return nil, NewBoltDbError(err)
}

var execs []models.Execution
Expand Down Expand Up @@ -423,20 +424,20 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) (
if query.ReturnAll || query.Namespace == "" {
bkt, err := NewBucketPath(BucketJobs).Get(tx, false)
if err != nil {
return nil, jobstore.NewBoltDbError(err.Error())
return nil, NewBoltDbError(err)
}

err = bkt.ForEachBucket(func(k []byte) error {
jobSet[string(k)] = struct{}{}
return nil
})
if err != nil {
return nil, jobstore.NewBoltDbError(err.Error())
return nil, NewBoltDbError(err)
}
} else {
ids, err := b.namespacesIndex.List(tx, []byte(query.Namespace))
if err != nil {
return nil, jobstore.NewBoltDbError(err.Error())
return nil, NewBoltDbError(err)
}

for _, k := range ids {
Expand All @@ -457,7 +458,7 @@ func (b *BoltJobStore) getJobsIncludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, jobstore.NewBoltDbError(err.Error())
return nil, NewBoltDbError(err)
}

for _, k := range ids {
Expand Down Expand Up @@ -485,7 +486,7 @@ func (b *BoltJobStore) getJobsExcludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, jobstore.NewBoltDbError(err.Error())
return nil, NewBoltDbError(err)
}

for _, k := range ids {
Expand Down Expand Up @@ -586,7 +587,7 @@ func (b *BoltJobStore) getInProgressJobs(tx *bolt.Tx, jobType string) ([]models.

keys, err := b.inProgressIndex.List(tx)
if err != nil {
return nil, jobstore.NewBoltDbError(err.Error())
return nil, NewBoltDbError(err)
}

for _, jobIDKey := range keys {
Expand Down Expand Up @@ -779,7 +780,7 @@ func (b *BoltJobStore) update(ctx context.Context, update func(tx *bolt.Tx) erro
tx, externalTx = txFromContext(ctx)
if externalTx {
if !tx.Writable() {
return jobstore.NewBoltDbError("readonly transaction provided in context for update operation")
return NewBoltDbError(errors.New("readonly transaction provided in context for update operation"))
}
} else {
tx, err = b.database.Begin(true)
Expand Down Expand Up @@ -819,7 +820,7 @@ func (b *BoltJobStore) view(ctx context.Context, view func(tx *bolt.Tx) error) e
if !externalTx {
tx, err = b.database.Begin(false)
if err != nil {
return jobstore.NewBoltDbError(err.Error())
return NewBoltDbError(err)
}
}

Expand All @@ -846,21 +847,21 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {

jobIDKey := []byte(job.ID)
if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, true); err != nil {
return jobstore.NewBoltDbError(err.Error())
return NewBoltDbError(err)
} else {
// Create the evaluations and executions buckets and so forth
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobExecutions)); err != nil {
return err
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobEvaluations)); err != nil {
return jobstore.NewBoltDbError(err.Error())
return NewBoltDbError(err)
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobHistory)); err != nil {
return jobstore.NewBoltDbError(err.Error())
return NewBoltDbError(err)
}

if _, err := bkt.CreateBucketIfNotExists([]byte(BucketExecutionHistory)); err != nil {
return jobstore.NewBoltDbError(err.Error())
return NewBoltDbError(err)
}
}

Expand All @@ -871,7 +872,7 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
}

if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, false); err != nil {
return jobstore.NewBoltDbError(err.Error())
return NewBoltDbError(err)
} else {
if err = bkt.Put(SpecKey, jobData); err != nil {
return err
Expand All @@ -881,11 +882,11 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
// Create a composite key for the in progress index
jobkey := createInProgressIndexKey(&job)
if err = b.inProgressIndex.Add(tx, []byte(jobkey)); err != nil {
return jobstore.NewBoltDbError(err.Error())
return NewBoltDbError(err)
}

if err = b.namespacesIndex.Add(tx, jobIDKey, []byte(job.Namespace)); err != nil {
return jobstore.NewBoltDbError(err.Error())
return NewBoltDbError(err)
}

// Write sentinels keys for specific tags
Expand Down Expand Up @@ -920,7 +921,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error {

// Delete the Job bucket (and everything within it)
if bkt, err := NewBucketPath(BucketJobs).Get(tx, false); err != nil {
return jobstore.NewBoltDbError(err.Error())
return NewBoltDbError(err)
} else {
if err = bkt.DeleteBucket([]byte(jobID)); err != nil {
return err
Expand Down
5 changes: 0 additions & 5 deletions pkg/jobstore/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

const (
JOB_STORE_COMPONENT = "JBS"
BOLTDB_COMPONENT = "BDB"
)

func NewErrJobNotFound(id string) *models.BaseError {
Expand Down Expand Up @@ -71,10 +70,6 @@ func NewErrExecutionAlreadyTerminal(id string, actual models.ExecutionStateType,
return models.NewBaseError("execution %s is in terminal state %s and cannot transition to %s", id, actual, newState)
}

func NewBoltDbError(message string) *models.BaseError {
return models.NewBaseError(message).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
}

func NewJobStoreError(message string) *models.BaseError {
return models.NewBaseError(message)
}
2 changes: 1 addition & 1 deletion pkg/orchestrator/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (e *BaseEndpoint) StopJob(ctx context.Context, request *StopJobRequest) (St

txContext, err := e.store.BeginTx(ctx)
if err != nil {
return StopJobResponse{}, jobstore.NewBoltDbError(err.Error())
return StopJobResponse{}, jobstore.NewJobStoreError(err.Error())
}

defer func() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/publicapi/apimodels/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import (
type APIError struct {
// httpstatuscode is the http status code associated with this error.
// it should correspond to standard http status codes (e.g., 400, 404, 500).
HTTPStatusCode int `json:"code"`
HTTPStatusCode int `json:"-"`

// message is a short, human-readable description of the error.
// it should be concise and provide a clear indication of what went wrong.
Message string `json:"message"`

// RequestID is the request ID of the request that caused the error.
RequestID string `json:"request_id"`
RequestID string `json:"-"`
}

// NewAPIError creates a new APIError with the given HTTP status code and message.
Expand Down
7 changes: 1 addition & 6 deletions pkg/publicapi/middleware/error_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package middleware

import (
"net/http"
"os"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
Expand Down Expand Up @@ -37,7 +36,7 @@ func CustomHTTPErrorHandler(err error, c echo.Context) {
message = "internal server error"
code = c.Response().Status

if isDebugMode() {
if c.Echo().Debug {
message = err.Error()
}
}
Expand All @@ -62,7 +61,3 @@ func CustomHTTPErrorHandler(err error, c echo.Context) {
}

}

func isDebugMode() bool {
return os.Getenv("BACALHAU_DEBUG") == "true"
}
9 changes: 5 additions & 4 deletions pkg/publicapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ func NewAPIServer(params ServerParams) (*Server, error) {
server.Router.Binder = NewNormalizeBinder()
server.Router.Validator = NewCustomValidator()

// enable debug mode to get clearer error messages
// TODO: disable debug mode after we implement our own error handler
server.Router.Debug = true

// set middleware
logLevel, err := zerolog.ParseLevel(params.Config.LogLevel)
if logLevel == zerolog.DebugLevel {
// enable debug mode to get clearer error messages
server.Router.Debug = true
}

if err != nil {
return nil, err
}
Expand Down

0 comments on commit af6557a

Please sign in to comment.