Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce APIError for Sync Calls from CLI to Orchestrator Node #4366

Merged
merged 18 commits into from
Sep 11, 2024
Merged
3 changes: 2 additions & 1 deletion cmd/cli/job/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ func NewDescribeCmd() *cobra.Command {
func (o *DescribeOptions) run(cmd *cobra.Command, args []string, api client.API) error {
ctx := cmd.Context()
jobID := args[0]

response, err := api.Jobs().Get(ctx, &apimodels.GetJobRequest{
JobID: jobID,
Include: "executions,history",
})

if err != nil {
return fmt.Errorf("could not get job %s: %w", jobID, err)
return err
}

if o.OutputOpts.Format != "" {
Expand Down
3 changes: 2 additions & 1 deletion cmd/cli/job/executions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package job

import (
"errors"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -147,7 +148,7 @@ func (o *ExecutionOptions) run(cmd *cobra.Command, args []string, api client.API
},
})
if err != nil {
return err
return errors.New(err.Error())
}

if err = output.Output(cmd, executionColumns, o.OutputOptions, response.Items); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/job/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func get(cmd *cobra.Command, cmdArgs []string, api client.API, cfg types.Bacalha
jobID,
OG.DownloadSettings,
); err != nil {
return fmt.Errorf("downloading job: %w", err)
return err
}

return nil
Expand Down
3 changes: 2 additions & 1 deletion cmd/cli/job/history.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package job

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -160,7 +161,7 @@ func (o *HistoryOptions) run(cmd *cobra.Command, args []string, api client.API)
},
})
if err != nil {
return err
return errors.New(err.Error())
}

if err = output.Output(cmd, historyColumns, o.OutputOptions, response.Items); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/util/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -33,7 +34,7 @@ func DownloadResultsHandler(
JobID: jobID,
})
if err != nil {
Fatal(cmd, fmt.Errorf("could not get results for job %s: %w", jobID, err), 1)
return errors.New(err.Error())
}

if len(response.Items) == 0 {
Expand Down
54 changes: 28 additions & 26 deletions pkg/jobstore/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/lib/marshaller"
"github.com/bacalhau-project/bacalhau/pkg/lib/math"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
"github.com/bacalhau-project/bacalhau/pkg/util"
"github.com/bacalhau-project/bacalhau/pkg/util/idgen"
)
Expand Down Expand Up @@ -167,6 +168,7 @@ func (b *BoltJobStore) GetJob(ctx context.Context, id string) (models.Job, error
}

func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) {

var job models.Job

jobID, err := b.reifyJobID(tx, jobID)
Expand All @@ -176,7 +178,7 @@ func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) {

data := GetBucketData(tx, NewBucketPath(BucketJobs, jobID), SpecKey)
if data == nil {
return job, bacerrors.NewJobNotFound(jobID)
return job, apimodels.NewJobNotFound(jobID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't sound right that internal and backend components depend on apimodels which are meant for the http API between frontend (api server) and clients.

Some suggestions:

  1. We should be able to define internal errors separate from the APIs, and then implement interceptors that can translate internal errors to http errors
  2. At the same time we should avoid having to translate each individual error to http format. So maybe having an interface that each of these errors implement, or a common underlying error type can simplify the translation
  3. We should avoid having a central pkg that defines all internal errors in the system like what we have today with bacerror

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, please let me know your thoughts about it.

}

err = b.marshaller.Unmarshal(data, &job)
Expand All @@ -189,7 +191,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) {
if idgen.ShortUUID(jobID) == jobID {
bktJobs, err := NewBucketPath(BucketJobs).Get(tx, false)
if err != nil {
return "", err
return "", jobstore.NewBoltDbError(err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would suggest to pass the actual error instead of just the string. This will allow NewBoltDbError to intercept the error and return different types or codes based on the actual error.

My not be needed here or now, but I am just thinking ahead when we capture and throw errors for docker executor and s3 publisher for example. You might find this pattern useful to centralize intercepting errors per component

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

found := make([]string, 0, 1)
Expand All @@ -202,11 +204,11 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) {

switch len(found) {
case 0:
return "", bacerrors.NewJobNotFound(jobID)
return "", jobstore.NewErrJobNotFound(jobID)
case 1:
return found[0], nil
default:
return "", bacerrors.NewMultipleJobsFound(jobID, found)
return "", jobstore.NewErrMultipleJobsFound(jobID)
}
}

Expand All @@ -223,7 +225,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e
}

if bkt, err := NewBucketPath(BucketJobs, key, BucketJobExecutions).Get(tx, false); err != nil {
return exec, err
return exec, jobstore.NewBoltDbError(err.Error())
} else {
data := bkt.Get([]byte(id))
if data == nil {
Expand All @@ -242,11 +244,11 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e
func (b *BoltJobStore) getExecutionJobID(tx *bolt.Tx, id string) (string, error) {
keys, err := b.executionsIndex.List(tx, []byte(id))
if err != nil {
return "", err
return "", jobstore.NewBoltDbError(err.Error())
}

if len(keys) != 1 {
return "", fmt.Errorf("too many leaf nodes in execution index")
return "", jobstore.NewJobStoreError("too many leaf nodes in execution index")
}

return string(keys[0]), nil
Expand Down Expand Up @@ -297,7 +299,7 @@ func (b *BoltJobStore) getExecutions(tx *bolt.Tx, options jobstore.GetExecutions

bkt, err := NewBucketPath(BucketJobs, jobID, BucketJobExecutions).Get(tx, false)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

var execs []models.Execution
Expand Down Expand Up @@ -421,20 +423,20 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) (
if query.ReturnAll || query.Namespace == "" {
bkt, err := NewBucketPath(BucketJobs).Get(tx, false)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

err = bkt.ForEachBucket(func(k []byte) error {
jobSet[string(k)] = struct{}{}
return nil
})
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}
} else {
ids, err := b.namespacesIndex.List(tx, []byte(query.Namespace))
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand All @@ -455,7 +457,7 @@ func (b *BoltJobStore) getJobsIncludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand Down Expand Up @@ -483,7 +485,7 @@ func (b *BoltJobStore) getJobsExcludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand Down Expand Up @@ -584,7 +586,7 @@ func (b *BoltJobStore) getInProgressJobs(tx *bolt.Tx, jobType string) ([]models.

keys, err := b.inProgressIndex.List(tx)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, jobIDKey := range keys {
Expand Down Expand Up @@ -758,7 +760,7 @@ func (b *BoltJobStore) CreateJob(ctx context.Context, job models.Job) error {
job.Normalize()
err := job.Validate()
if err != nil {
return err
return jobstore.NewJobStoreError(err.Error())
}
return b.update(ctx, func(tx *bolt.Tx) (err error) {
return b.createJob(tx, job)
Expand All @@ -777,7 +779,7 @@ func (b *BoltJobStore) update(ctx context.Context, update func(tx *bolt.Tx) erro
tx, externalTx = txFromContext(ctx)
if externalTx {
if !tx.Writable() {
return fmt.Errorf("readonly transaction provided in context for update operation")
return jobstore.NewBoltDbError("readonly transaction provided in context for update operation")
}
} else {
tx, err = b.database.Begin(true)
Expand Down Expand Up @@ -817,7 +819,7 @@ func (b *BoltJobStore) view(ctx context.Context, view func(tx *bolt.Tx) error) e
if !externalTx {
tx, err = b.database.Begin(false)
if err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
}

Expand All @@ -844,21 +846,21 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {

jobIDKey := []byte(job.ID)
if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, true); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
// Create the evaluations and executions buckets and so forth
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobExecutions)); err != nil {
return err
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobEvaluations)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobHistory)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

if _, err := bkt.CreateBucketIfNotExists([]byte(BucketExecutionHistory)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
}

Expand All @@ -869,7 +871,7 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
}

if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, false); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
if err = bkt.Put(SpecKey, jobData); err != nil {
return err
Expand All @@ -879,11 +881,11 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
// Create a composite key for the in progress index
jobkey := createInProgressIndexKey(&job)
if err = b.inProgressIndex.Add(tx, []byte(jobkey)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

if err = b.namespacesIndex.Add(tx, jobIDKey, []byte(job.Namespace)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

// Write sentinels keys for specific tags
Expand All @@ -909,7 +911,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error {

job, err := b.getJob(tx, jobID)
if err != nil {
return bacerrors.NewJobNotFound(jobID)
apimodels.NewJobNotFound(jobID)
}

tx.OnCommit(func() {
Expand All @@ -918,7 +920,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error {

// Delete the Job bucket (and everything within it)
if bkt, err := NewBucketPath(BucketJobs).Get(tx, false); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
if err = bkt.DeleteBucket([]byte(jobID)); err != nil {
return err
Expand Down
11 changes: 5 additions & 6 deletions pkg/jobstore/boltdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/suite"
"k8s.io/apimachinery/pkg/labels"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/jobstore"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/test/mock"
Expand Down Expand Up @@ -202,7 +201,7 @@ func (s *BoltJobstoreTestSuite) TestUnfilteredJobHistory() {

jobHistoryQueryResponse, err = s.store.GetJobHistory(s.ctx, "1", jobstore.JobHistoryQuery{})
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.MultipleJobsFound{})
s.Require().IsType(err, &models.BaseError{})
s.Require().Nil(jobHistoryQueryResponse)
}

Expand Down Expand Up @@ -543,7 +542,7 @@ func (s *BoltJobstoreTestSuite) TestGetExecutions() {
JobID: "100",
})
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.JobNotFound{})
s.Require().IsType(err, &models.BaseError{})
s.Require().Nil(state)

state, err = s.store.GetExecutions(s.ctx, jobstore.GetExecutionsOptions{
Expand All @@ -557,7 +556,7 @@ func (s *BoltJobstoreTestSuite) TestGetExecutions() {
JobID: "1",
})
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.MultipleJobsFound{})
s.Require().IsType(err, &models.BaseError{})
s.Require().Nil(state)

// Created At Ascending Order Sort
Expand Down Expand Up @@ -684,7 +683,7 @@ func (s *BoltJobstoreTestSuite) TestShortIDs() {
// No matches
_, err := s.store.GetJob(s.ctx, shortString)
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.JobNotFound{})
s.Require().IsType(err, &models.BaseError{})

// Create and fetch the single entry
err = s.store.CreateJob(s.ctx, *job)
Expand All @@ -701,7 +700,7 @@ func (s *BoltJobstoreTestSuite) TestShortIDs() {

_, err = s.store.GetJob(s.ctx, shortString)
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.MultipleJobsFound{})
s.Require().IsType(err, &models.BaseError{})
}

func (s *BoltJobstoreTestSuite) TestEvents() {
Expand Down
Loading
Loading