Skip to content

Commit

Permalink
fix jobstore further
Browse files Browse the repository at this point in the history
  • Loading branch information
udsamani committed Sep 11, 2024
1 parent 3548a73 commit 3336679
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
33 changes: 17 additions & 16 deletions pkg/jobstore/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (b *BoltJobStore) GetJob(ctx context.Context, id string) (models.Job, error
}

func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) {

var job models.Job

jobID, err := b.reifyJobID(tx, jobID)
Expand Down Expand Up @@ -207,7 +208,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) {
case 1:
return found[0], nil
default:
return "", bacerrors.NewMultipleJobsFound(jobID, found)
return "", jobstore.NewErrMultipleJobsFound(jobID)
}
}

Expand Down Expand Up @@ -430,12 +431,12 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) (
return nil
})
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}
} else {
ids, err := b.namespacesIndex.List(tx, []byte(query.Namespace))
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand All @@ -456,7 +457,7 @@ func (b *BoltJobStore) getJobsIncludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand Down Expand Up @@ -484,7 +485,7 @@ func (b *BoltJobStore) getJobsExcludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand Down Expand Up @@ -585,7 +586,7 @@ func (b *BoltJobStore) getInProgressJobs(tx *bolt.Tx, jobType string) ([]models.

keys, err := b.inProgressIndex.List(tx)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, jobIDKey := range keys {
Expand Down Expand Up @@ -778,7 +779,7 @@ func (b *BoltJobStore) update(ctx context.Context, update func(tx *bolt.Tx) erro
tx, externalTx = txFromContext(ctx)
if externalTx {
if !tx.Writable() {
return fmt.Errorf("readonly transaction provided in context for update operation")
return jobstore.NewBoltDbError("readonly transaction provided in context for update operation")
}
} else {
tx, err = b.database.Begin(true)
Expand Down Expand Up @@ -818,7 +819,7 @@ func (b *BoltJobStore) view(ctx context.Context, view func(tx *bolt.Tx) error) e
if !externalTx {
tx, err = b.database.Begin(false)
if err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
}

Expand All @@ -845,21 +846,21 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {

jobIDKey := []byte(job.ID)
if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, true); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
// Create the evaluations and executions buckets and so forth
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobExecutions)); err != nil {
return err
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobEvaluations)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobHistory)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

if _, err := bkt.CreateBucketIfNotExists([]byte(BucketExecutionHistory)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
}

Expand All @@ -870,7 +871,7 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
}

if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, false); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
if err = bkt.Put(SpecKey, jobData); err != nil {
return err
Expand All @@ -880,11 +881,11 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
// Create a composite key for the in progress index
jobkey := createInProgressIndexKey(&job)
if err = b.inProgressIndex.Add(tx, []byte(jobkey)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

if err = b.namespacesIndex.Add(tx, jobIDKey, []byte(job.Namespace)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

// Write sentinels keys for specific tags
Expand Down Expand Up @@ -919,7 +920,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error {

// Delete the Job bucket (and everything within it)
if bkt, err := NewBucketPath(BucketJobs).Get(tx, false); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
if err = bkt.DeleteBucket([]byte(jobID)); err != nil {
return err
Expand Down
16 changes: 9 additions & 7 deletions pkg/jobstore/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/models"
)

const JOB_STORE_COMPONENT = "JBS"

// ErrJobNotFound is returned when the job is not found
type ErrJobNotFound struct {
JobID string
}
const (
JOB_STORE_COMPONENT = "JBS"
BOLTDB_COMPONENT = "BDB"
)

func NewErrJobNotFound(id string) *models.BaseError {
return models.NewBaseError("job not found: %s", id).WithCode(models.NewErrorCode(JOB_STORE_COMPONENT, 404))
}

func NewErrMultipleJobsFound(id string) *models.BaseError {
return models.NewBaseError("multiple jobs found for id %s", id).WithCode(models.NewErrorCode(JOB_STORE_COMPONENT, 400))
}

func NewErrJobAlreadyExists(id string) *models.BaseError {
return models.NewBaseError("job already exists: %s", id)
}
Expand Down Expand Up @@ -70,7 +72,7 @@ func NewErrExecutionAlreadyTerminal(id string, actual models.ExecutionStateType,
}

func NewBoltDbError(message string) *models.BaseError {
return models.NewBaseError(message)
return models.NewBaseError(message).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
}

func NewJobStoreError(message string) *models.BaseError {
Expand Down

0 comments on commit 3336679

Please sign in to comment.