Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce APIError for Sync Calls from CLI to Orchestrator Node #4366

Merged
merged 18 commits into from
Sep 11, 2024
Merged
3 changes: 1 addition & 2 deletions cmd/cli/job/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package job

import (
"cmp"
"errors"
"fmt"
"slices"
"time"
Expand Down Expand Up @@ -90,7 +89,7 @@ func (o *DescribeOptions) run(cmd *cobra.Command, args []string, api client.API)
})

if err != nil {
return errors.New(err.Error())
return err
}

if o.OutputOpts.Format != "" {
Expand Down
3 changes: 1 addition & 2 deletions cmd/cli/job/get.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package job

import (
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -109,7 +108,7 @@ func get(cmd *cobra.Command, cmdArgs []string, api client.API, cfg types.Bacalha
jobID,
OG.DownloadSettings,
); err != nil {
return errors.New(err.Error())
return err
}

return nil
Expand Down
49 changes: 25 additions & 24 deletions pkg/jobstore/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (b *BoltJobStore) GetJob(ctx context.Context, id string) (models.Job, error
}

func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) {

var job models.Job

jobID, err := b.reifyJobID(tx, jobID)
Expand All @@ -190,7 +191,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) {
if idgen.ShortUUID(jobID) == jobID {
bktJobs, err := NewBucketPath(BucketJobs).Get(tx, false)
if err != nil {
return "", err
return "", jobstore.NewBoltDbError(err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would suggest to pass the actual error instead of just the string. This will allow NewBoltDbError to intercept the error and return different types or codes based on the actual error.

My not be needed here or now, but I am just thinking ahead when we capture and throw errors for docker executor and s3 publisher for example. You might find this pattern useful to centralize intercepting errors per component

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

found := make([]string, 0, 1)
Expand All @@ -203,11 +204,11 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) {

switch len(found) {
case 0:
return "", apimodels.NewJobNotFound(jobID)
return "", jobstore.NewErrJobNotFound(jobID)
case 1:
return found[0], nil
default:
return "", bacerrors.NewMultipleJobsFound(jobID, found)
return "", jobstore.NewErrMultipleJobsFound(jobID)
}
}

Expand All @@ -224,7 +225,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e
}

if bkt, err := NewBucketPath(BucketJobs, key, BucketJobExecutions).Get(tx, false); err != nil {
return exec, err
return exec, jobstore.NewBoltDbError(err.Error())
} else {
data := bkt.Get([]byte(id))
if data == nil {
Expand All @@ -243,11 +244,11 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e
func (b *BoltJobStore) getExecutionJobID(tx *bolt.Tx, id string) (string, error) {
keys, err := b.executionsIndex.List(tx, []byte(id))
if err != nil {
return "", err
return "", jobstore.NewBoltDbError(err.Error())
}

if len(keys) != 1 {
return "", fmt.Errorf("too many leaf nodes in execution index")
return "", jobstore.NewJobStoreError("too many leaf nodes in execution index")
}

return string(keys[0]), nil
Expand Down Expand Up @@ -298,7 +299,7 @@ func (b *BoltJobStore) getExecutions(tx *bolt.Tx, options jobstore.GetExecutions

bkt, err := NewBucketPath(BucketJobs, jobID, BucketJobExecutions).Get(tx, false)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

var execs []models.Execution
Expand Down Expand Up @@ -422,20 +423,20 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) (
if query.ReturnAll || query.Namespace == "" {
bkt, err := NewBucketPath(BucketJobs).Get(tx, false)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

err = bkt.ForEachBucket(func(k []byte) error {
jobSet[string(k)] = struct{}{}
return nil
})
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}
} else {
ids, err := b.namespacesIndex.List(tx, []byte(query.Namespace))
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand All @@ -456,7 +457,7 @@ func (b *BoltJobStore) getJobsIncludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand Down Expand Up @@ -484,7 +485,7 @@ func (b *BoltJobStore) getJobsExcludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand Down Expand Up @@ -585,7 +586,7 @@ func (b *BoltJobStore) getInProgressJobs(tx *bolt.Tx, jobType string) ([]models.

keys, err := b.inProgressIndex.List(tx)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, jobIDKey := range keys {
Expand Down Expand Up @@ -759,7 +760,7 @@ func (b *BoltJobStore) CreateJob(ctx context.Context, job models.Job) error {
job.Normalize()
err := job.Validate()
if err != nil {
return err
return jobstore.NewJobStoreError(err.Error())
}
return b.update(ctx, func(tx *bolt.Tx) (err error) {
return b.createJob(tx, job)
Expand All @@ -778,7 +779,7 @@ func (b *BoltJobStore) update(ctx context.Context, update func(tx *bolt.Tx) erro
tx, externalTx = txFromContext(ctx)
if externalTx {
if !tx.Writable() {
return fmt.Errorf("readonly transaction provided in context for update operation")
return jobstore.NewBoltDbError("readonly transaction provided in context for update operation")
}
} else {
tx, err = b.database.Begin(true)
Expand Down Expand Up @@ -818,7 +819,7 @@ func (b *BoltJobStore) view(ctx context.Context, view func(tx *bolt.Tx) error) e
if !externalTx {
tx, err = b.database.Begin(false)
if err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
}

Expand All @@ -845,21 +846,21 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {

jobIDKey := []byte(job.ID)
if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, true); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
// Create the evaluations and executions buckets and so forth
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobExecutions)); err != nil {
return err
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobEvaluations)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobHistory)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

if _, err := bkt.CreateBucketIfNotExists([]byte(BucketExecutionHistory)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
}

Expand All @@ -870,7 +871,7 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
}

if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, false); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
if err = bkt.Put(SpecKey, jobData); err != nil {
return err
Expand All @@ -880,11 +881,11 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
// Create a composite key for the in progress index
jobkey := createInProgressIndexKey(&job)
if err = b.inProgressIndex.Add(tx, []byte(jobkey)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

if err = b.namespacesIndex.Add(tx, jobIDKey, []byte(job.Namespace)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

// Write sentinels keys for specific tags
Expand Down Expand Up @@ -919,7 +920,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error {

// Delete the Job bucket (and everything within it)
if bkt, err := NewBucketPath(BucketJobs).Get(tx, false); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
if err = bkt.DeleteBucket([]byte(jobID)); err != nil {
return err
Expand Down
11 changes: 5 additions & 6 deletions pkg/jobstore/boltdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/suite"
"k8s.io/apimachinery/pkg/labels"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/jobstore"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/test/mock"
Expand Down Expand Up @@ -202,7 +201,7 @@ func (s *BoltJobstoreTestSuite) TestUnfilteredJobHistory() {

jobHistoryQueryResponse, err = s.store.GetJobHistory(s.ctx, "1", jobstore.JobHistoryQuery{})
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.MultipleJobsFound{})
s.Require().IsType(err, &models.BaseError{})
s.Require().Nil(jobHistoryQueryResponse)
}

Expand Down Expand Up @@ -543,7 +542,7 @@ func (s *BoltJobstoreTestSuite) TestGetExecutions() {
JobID: "100",
})
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.JobNotFound{})
s.Require().IsType(err, &models.BaseError{})
s.Require().Nil(state)

state, err = s.store.GetExecutions(s.ctx, jobstore.GetExecutionsOptions{
Expand All @@ -557,7 +556,7 @@ func (s *BoltJobstoreTestSuite) TestGetExecutions() {
JobID: "1",
})
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.MultipleJobsFound{})
s.Require().IsType(err, &models.BaseError{})
s.Require().Nil(state)

// Created At Ascending Order Sort
Expand Down Expand Up @@ -684,7 +683,7 @@ func (s *BoltJobstoreTestSuite) TestShortIDs() {
// No matches
_, err := s.store.GetJob(s.ctx, shortString)
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.JobNotFound{})
s.Require().IsType(err, &models.BaseError{})

// Create and fetch the single entry
err = s.store.CreateJob(s.ctx, *job)
Expand All @@ -701,7 +700,7 @@ func (s *BoltJobstoreTestSuite) TestShortIDs() {

_, err = s.store.GetJob(s.ctx, shortString)
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.MultipleJobsFound{})
s.Require().IsType(err, &models.BaseError{})
}

func (s *BoltJobstoreTestSuite) TestEvents() {
Expand Down
Loading
Loading