Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce APIError for Sync Calls from CLI to Orchestrator Node #4366

Merged
merged 18 commits into from
Sep 11, 2024
Merged
3 changes: 2 additions & 1 deletion cmd/cli/job/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ func NewDescribeCmd() *cobra.Command {
func (o *DescribeOptions) run(cmd *cobra.Command, args []string, api client.API) error {
ctx := cmd.Context()
jobID := args[0]

response, err := api.Jobs().Get(ctx, &apimodels.GetJobRequest{
JobID: jobID,
Include: "executions,history",
})

if err != nil {
return fmt.Errorf("could not get job %s: %w", jobID, err)
return err
}

if o.OutputOpts.Format != "" {
Expand Down
3 changes: 2 additions & 1 deletion cmd/cli/job/executions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package job

import (
"errors"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -147,7 +148,7 @@ func (o *ExecutionOptions) run(cmd *cobra.Command, args []string, api client.API
},
})
if err != nil {
return err
return errors.New(err.Error())
}

if err = output.Output(cmd, executionColumns, o.OutputOptions, response.Items); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/job/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func get(cmd *cobra.Command, cmdArgs []string, api client.API, cfg types.Bacalha
jobID,
OG.DownloadSettings,
); err != nil {
return fmt.Errorf("downloading job: %w", err)
return err
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/job/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *GetSuite) TestGetSingleFileFromOutputBadChoice() {
)

require.Error(s.T(), err, "expected error but it wasn't returned")
require.Contains(s.T(), getoutput, "Error: downloading job")
require.Contains(s.T(), getoutput, "Error: failed to find cid for missing")
}

func (s *GetSuite) TestGetSingleFileFromOutput() {
Expand Down
3 changes: 2 additions & 1 deletion cmd/cli/job/history.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package job

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -160,7 +161,7 @@ func (o *HistoryOptions) run(cmd *cobra.Command, args []string, api client.API)
},
})
if err != nil {
return err
return errors.New(err.Error())
}

if err = output.Output(cmd, historyColumns, o.OutputOptions, response.Items); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/util/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -33,7 +34,7 @@ func DownloadResultsHandler(
JobID: jobID,
})
if err != nil {
Fatal(cmd, fmt.Errorf("could not get results for job %s: %w", jobID, err), 1)
return errors.New(err.Error())
}

if len(response.Items) == 0 {
Expand Down
31 changes: 31 additions & 0 deletions pkg/jobstore/boltdb/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package boltjobstore

import (
"errors"

"github.com/bacalhau-project/bacalhau/pkg/models"
"go.etcd.io/bbolt"
)

const BOLTDB_COMPONENT = "BDB"

func NewBoltDbError(err error) *models.BaseError {
switch {
case errors.Is(err, bbolt.ErrBucketNotFound):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 404))
case errors.Is(err, bbolt.ErrBucketExists):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 409))
case errors.Is(err, bbolt.ErrTxNotWritable):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
case errors.Is(err, bbolt.ErrIncompatibleValue):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
case errors.Is(err, bbolt.ErrKeyRequired):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
case errors.Is(err, bbolt.ErrKeyTooLarge):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
case errors.Is(err, bbolt.ErrValueTooLarge):
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
default:
return models.NewBaseError(err.Error()).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500))
}
}
55 changes: 29 additions & 26 deletions pkg/jobstore/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package boltjobstore
import (
"bytes"
"context"
"errors"
"fmt"
"slices"
"sort"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/lib/marshaller"
"github.com/bacalhau-project/bacalhau/pkg/lib/math"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
"github.com/bacalhau-project/bacalhau/pkg/util"
"github.com/bacalhau-project/bacalhau/pkg/util/idgen"
)
Expand Down Expand Up @@ -167,6 +169,7 @@ func (b *BoltJobStore) GetJob(ctx context.Context, id string) (models.Job, error
}

func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) {

var job models.Job

jobID, err := b.reifyJobID(tx, jobID)
Expand All @@ -176,7 +179,7 @@ func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) {

data := GetBucketData(tx, NewBucketPath(BucketJobs, jobID), SpecKey)
if data == nil {
return job, bacerrors.NewJobNotFound(jobID)
return job, apimodels.NewJobNotFound(jobID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't sound right that internal and backend components depend on apimodels which are meant for the http API between frontend (api server) and clients.

Some suggestions:

  1. We should be able to define internal errors separate from the APIs, and then implement interceptors that can translate internal errors to http errors
  2. At the same time we should avoid having to translate each individual error to http format. So maybe having an interface that each of these errors implement, or a common underlying error type can simplify the translation
  3. We should avoid having a central pkg that defines all internal errors in the system like what we have today with bacerror

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, please let me know your thoughts about it.

}

err = b.marshaller.Unmarshal(data, &job)
Expand All @@ -189,7 +192,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) {
if idgen.ShortUUID(jobID) == jobID {
bktJobs, err := NewBucketPath(BucketJobs).Get(tx, false)
if err != nil {
return "", err
return "", NewBoltDbError(err)
}

found := make([]string, 0, 1)
Expand All @@ -202,11 +205,11 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) {

switch len(found) {
case 0:
return "", bacerrors.NewJobNotFound(jobID)
return "", jobstore.NewErrJobNotFound(jobID)
case 1:
return found[0], nil
default:
return "", bacerrors.NewMultipleJobsFound(jobID, found)
return "", jobstore.NewErrMultipleJobsFound(jobID)
}
}

Expand All @@ -223,7 +226,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e
}

if bkt, err := NewBucketPath(BucketJobs, key, BucketJobExecutions).Get(tx, false); err != nil {
return exec, err
return exec, NewBoltDbError(err)
} else {
data := bkt.Get([]byte(id))
if data == nil {
Expand All @@ -242,11 +245,11 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e
func (b *BoltJobStore) getExecutionJobID(tx *bolt.Tx, id string) (string, error) {
keys, err := b.executionsIndex.List(tx, []byte(id))
if err != nil {
return "", err
return "", NewBoltDbError(err)
}

if len(keys) != 1 {
return "", fmt.Errorf("too many leaf nodes in execution index")
return "", jobstore.NewJobStoreError("too many leaf nodes in execution index")
}

return string(keys[0]), nil
Expand Down Expand Up @@ -297,7 +300,7 @@ func (b *BoltJobStore) getExecutions(tx *bolt.Tx, options jobstore.GetExecutions

bkt, err := NewBucketPath(BucketJobs, jobID, BucketJobExecutions).Get(tx, false)
if err != nil {
return nil, err
return nil, NewBoltDbError(err)
}

var execs []models.Execution
Expand Down Expand Up @@ -421,20 +424,20 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) (
if query.ReturnAll || query.Namespace == "" {
bkt, err := NewBucketPath(BucketJobs).Get(tx, false)
if err != nil {
return nil, err
return nil, NewBoltDbError(err)
}

err = bkt.ForEachBucket(func(k []byte) error {
jobSet[string(k)] = struct{}{}
return nil
})
if err != nil {
return nil, err
return nil, NewBoltDbError(err)
}
} else {
ids, err := b.namespacesIndex.List(tx, []byte(query.Namespace))
if err != nil {
return nil, err
return nil, NewBoltDbError(err)
}

for _, k := range ids {
Expand All @@ -455,7 +458,7 @@ func (b *BoltJobStore) getJobsIncludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, err
return nil, NewBoltDbError(err)
}

for _, k := range ids {
Expand Down Expand Up @@ -483,7 +486,7 @@ func (b *BoltJobStore) getJobsExcludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, err
return nil, NewBoltDbError(err)
}

for _, k := range ids {
Expand Down Expand Up @@ -584,7 +587,7 @@ func (b *BoltJobStore) getInProgressJobs(tx *bolt.Tx, jobType string) ([]models.

keys, err := b.inProgressIndex.List(tx)
if err != nil {
return nil, err
return nil, NewBoltDbError(err)
}

for _, jobIDKey := range keys {
Expand Down Expand Up @@ -758,7 +761,7 @@ func (b *BoltJobStore) CreateJob(ctx context.Context, job models.Job) error {
job.Normalize()
err := job.Validate()
if err != nil {
return err
return jobstore.NewJobStoreError(err.Error())
}
return b.update(ctx, func(tx *bolt.Tx) (err error) {
return b.createJob(tx, job)
Expand All @@ -777,7 +780,7 @@ func (b *BoltJobStore) update(ctx context.Context, update func(tx *bolt.Tx) erro
tx, externalTx = txFromContext(ctx)
if externalTx {
if !tx.Writable() {
return fmt.Errorf("readonly transaction provided in context for update operation")
return NewBoltDbError(errors.New("readonly transaction provided in context for update operation"))
}
} else {
tx, err = b.database.Begin(true)
Expand Down Expand Up @@ -817,7 +820,7 @@ func (b *BoltJobStore) view(ctx context.Context, view func(tx *bolt.Tx) error) e
if !externalTx {
tx, err = b.database.Begin(false)
if err != nil {
return err
return NewBoltDbError(err)
}
}

Expand All @@ -844,21 +847,21 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {

jobIDKey := []byte(job.ID)
if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, true); err != nil {
return err
return NewBoltDbError(err)
} else {
// Create the evaluations and executions buckets and so forth
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobExecutions)); err != nil {
return err
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobEvaluations)); err != nil {
return err
return NewBoltDbError(err)
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobHistory)); err != nil {
return err
return NewBoltDbError(err)
}

if _, err := bkt.CreateBucketIfNotExists([]byte(BucketExecutionHistory)); err != nil {
return err
return NewBoltDbError(err)
}
}

Expand All @@ -869,7 +872,7 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
}

if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, false); err != nil {
return err
return NewBoltDbError(err)
} else {
if err = bkt.Put(SpecKey, jobData); err != nil {
return err
Expand All @@ -879,11 +882,11 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
// Create a composite key for the in progress index
jobkey := createInProgressIndexKey(&job)
if err = b.inProgressIndex.Add(tx, []byte(jobkey)); err != nil {
return err
return NewBoltDbError(err)
}

if err = b.namespacesIndex.Add(tx, jobIDKey, []byte(job.Namespace)); err != nil {
return err
return NewBoltDbError(err)
}

// Write sentinels keys for specific tags
Expand All @@ -909,7 +912,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error {

job, err := b.getJob(tx, jobID)
if err != nil {
return bacerrors.NewJobNotFound(jobID)
apimodels.NewJobNotFound(jobID)
}

tx.OnCommit(func() {
Expand All @@ -918,7 +921,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error {

// Delete the Job bucket (and everything within it)
if bkt, err := NewBucketPath(BucketJobs).Get(tx, false); err != nil {
return err
return NewBoltDbError(err)
} else {
if err = bkt.DeleteBucket([]byte(jobID)); err != nil {
return err
Expand Down
Loading
Loading