Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3 publisher errors should root from base error #4401

Closed
wants to merge 18 commits into from
Closed
3 changes: 2 additions & 1 deletion cmd/cli/job/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ func NewDescribeCmd() *cobra.Command {
func (o *DescribeOptions) run(cmd *cobra.Command, args []string, api client.API) error {
ctx := cmd.Context()
jobID := args[0]

response, err := api.Jobs().Get(ctx, &apimodels.GetJobRequest{
JobID: jobID,
Include: "executions,history",
})

if err != nil {
return fmt.Errorf("could not get job %s: %w", jobID, err)
return err
Comment on lines -91 to +92
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is being removed since the Get method's error already contains a detailed description?

}

if o.OutputOpts.Format != "" {
Expand Down
3 changes: 2 additions & 1 deletion cmd/cli/job/executions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package job

import (
"errors"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -147,7 +148,7 @@ func (o *ExecutionOptions) run(cmd *cobra.Command, args []string, api client.API
},
})
if err != nil {
return err
return errors.New(err.Error())
Comment on lines -150 to +151
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert this, either do:
return err
or
return fmt.Errorf("<helpful message>: %w", err)

}

if err = output.Output(cmd, executionColumns, o.OutputOptions, response.Items); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/job/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func get(cmd *cobra.Command, cmdArgs []string, api client.API, cfg types.Bacalha
jobID,
OG.DownloadSettings,
); err != nil {
return fmt.Errorf("downloading job: %w", err)
return err
}

return nil
Expand Down
3 changes: 2 additions & 1 deletion cmd/cli/job/history.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package job

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -160,7 +161,7 @@ func (o *HistoryOptions) run(cmd *cobra.Command, args []string, api client.API)
},
})
if err != nil {
return err
return errors.New(err.Error())
Comment on lines -163 to +164
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment as stated elsewhere.

}

if err = output.Output(cmd, historyColumns, o.OutputOptions, response.Items); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/util/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -33,7 +34,7 @@ func DownloadResultsHandler(
JobID: jobID,
})
if err != nil {
Fatal(cmd, fmt.Errorf("could not get results for job %s: %w", jobID, err), 1)
return errors.New(err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment as stated elsewhere.

}

if len(response.Items) == 0 {
Expand Down
54 changes: 28 additions & 26 deletions pkg/jobstore/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/lib/marshaller"
"github.com/bacalhau-project/bacalhau/pkg/lib/math"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
"github.com/bacalhau-project/bacalhau/pkg/util"
"github.com/bacalhau-project/bacalhau/pkg/util/idgen"
)
Expand Down Expand Up @@ -167,6 +168,7 @@ func (b *BoltJobStore) GetJob(ctx context.Context, id string) (models.Job, error
}

func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) {

var job models.Job

jobID, err := b.reifyJobID(tx, jobID)
Expand All @@ -176,7 +178,7 @@ func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) {

data := GetBucketData(tx, NewBucketPath(BucketJobs, jobID), SpecKey)
if data == nil {
return job, bacerrors.NewJobNotFound(jobID)
return job, apimodels.NewJobNotFound(jobID)
Comment on lines -179 to +181
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can used jobstore.NewErrJobNotFound here, no?

}

err = b.marshaller.Unmarshal(data, &job)
Expand All @@ -189,7 +191,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) {
if idgen.ShortUUID(jobID) == jobID {
bktJobs, err := NewBucketPath(BucketJobs).Get(tx, false)
if err != nil {
return "", err
return "", jobstore.NewBoltDbError(err.Error())
}

found := make([]string, 0, 1)
Expand All @@ -202,11 +204,11 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) {

switch len(found) {
case 0:
return "", bacerrors.NewJobNotFound(jobID)
return "", jobstore.NewErrJobNotFound(jobID)
case 1:
return found[0], nil
default:
return "", bacerrors.NewMultipleJobsFound(jobID, found)
return "", jobstore.NewErrMultipleJobsFound(jobID)
}
}

Expand All @@ -223,7 +225,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e
}

if bkt, err := NewBucketPath(BucketJobs, key, BucketJobExecutions).Get(tx, false); err != nil {
return exec, err
return exec, jobstore.NewBoltDbError(err.Error())
} else {
data := bkt.Get([]byte(id))
if data == nil {
Expand All @@ -242,11 +244,11 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e
func (b *BoltJobStore) getExecutionJobID(tx *bolt.Tx, id string) (string, error) {
keys, err := b.executionsIndex.List(tx, []byte(id))
if err != nil {
return "", err
return "", jobstore.NewBoltDbError(err.Error())
}

if len(keys) != 1 {
return "", fmt.Errorf("too many leaf nodes in execution index")
return "", jobstore.NewJobStoreError("too many leaf nodes in execution index")
}

return string(keys[0]), nil
Expand Down Expand Up @@ -297,7 +299,7 @@ func (b *BoltJobStore) getExecutions(tx *bolt.Tx, options jobstore.GetExecutions

bkt, err := NewBucketPath(BucketJobs, jobID, BucketJobExecutions).Get(tx, false)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

var execs []models.Execution
Expand Down Expand Up @@ -421,20 +423,20 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) (
if query.ReturnAll || query.Namespace == "" {
bkt, err := NewBucketPath(BucketJobs).Get(tx, false)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

err = bkt.ForEachBucket(func(k []byte) error {
jobSet[string(k)] = struct{}{}
return nil
})
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}
} else {
ids, err := b.namespacesIndex.List(tx, []byte(query.Namespace))
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand All @@ -455,7 +457,7 @@ func (b *BoltJobStore) getJobsIncludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand Down Expand Up @@ -483,7 +485,7 @@ func (b *BoltJobStore) getJobsExcludeTags(tx *bolt.Tx, jobSet map[string]struct{
tagLabel := []byte(strings.ToLower(tag))
ids, err := b.tagsIndex.List(tx, tagLabel)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, k := range ids {
Expand Down Expand Up @@ -584,7 +586,7 @@ func (b *BoltJobStore) getInProgressJobs(tx *bolt.Tx, jobType string) ([]models.

keys, err := b.inProgressIndex.List(tx)
if err != nil {
return nil, err
return nil, jobstore.NewBoltDbError(err.Error())
}

for _, jobIDKey := range keys {
Expand Down Expand Up @@ -758,7 +760,7 @@ func (b *BoltJobStore) CreateJob(ctx context.Context, job models.Job) error {
job.Normalize()
err := job.Validate()
if err != nil {
return err
return jobstore.NewJobStoreError(err.Error())
}
return b.update(ctx, func(tx *bolt.Tx) (err error) {
return b.createJob(tx, job)
Expand All @@ -777,7 +779,7 @@ func (b *BoltJobStore) update(ctx context.Context, update func(tx *bolt.Tx) erro
tx, externalTx = txFromContext(ctx)
if externalTx {
if !tx.Writable() {
return fmt.Errorf("readonly transaction provided in context for update operation")
return jobstore.NewBoltDbError("readonly transaction provided in context for update operation")
}
} else {
tx, err = b.database.Begin(true)
Expand Down Expand Up @@ -817,7 +819,7 @@ func (b *BoltJobStore) view(ctx context.Context, view func(tx *bolt.Tx) error) e
if !externalTx {
tx, err = b.database.Begin(false)
if err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
}

Expand All @@ -844,21 +846,21 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {

jobIDKey := []byte(job.ID)
if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, true); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
// Create the evaluations and executions buckets and so forth
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobExecutions)); err != nil {
return err
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobEvaluations)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobHistory)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

if _, err := bkt.CreateBucketIfNotExists([]byte(BucketExecutionHistory)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}
}

Expand All @@ -869,7 +871,7 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
}

if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, false); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
if err = bkt.Put(SpecKey, jobData); err != nil {
return err
Expand All @@ -879,11 +881,11 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error {
// Create a composite key for the in progress index
jobkey := createInProgressIndexKey(&job)
if err = b.inProgressIndex.Add(tx, []byte(jobkey)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

if err = b.namespacesIndex.Add(tx, jobIDKey, []byte(job.Namespace)); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
}

// Write sentinels keys for specific tags
Expand All @@ -909,7 +911,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error {

job, err := b.getJob(tx, jobID)
if err != nil {
return bacerrors.NewJobNotFound(jobID)
apimodels.NewJobNotFound(jobID)
}

tx.OnCommit(func() {
Expand All @@ -918,7 +920,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error {

// Delete the Job bucket (and everything within it)
if bkt, err := NewBucketPath(BucketJobs).Get(tx, false); err != nil {
return err
return jobstore.NewBoltDbError(err.Error())
} else {
if err = bkt.DeleteBucket([]byte(jobID)); err != nil {
return err
Expand Down
11 changes: 5 additions & 6 deletions pkg/jobstore/boltdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/suite"
"k8s.io/apimachinery/pkg/labels"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/jobstore"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/test/mock"
Expand Down Expand Up @@ -202,7 +201,7 @@ func (s *BoltJobstoreTestSuite) TestUnfilteredJobHistory() {

jobHistoryQueryResponse, err = s.store.GetJobHistory(s.ctx, "1", jobstore.JobHistoryQuery{})
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.MultipleJobsFound{})
s.Require().IsType(err, &models.BaseError{})
s.Require().Nil(jobHistoryQueryResponse)
}

Expand Down Expand Up @@ -543,7 +542,7 @@ func (s *BoltJobstoreTestSuite) TestGetExecutions() {
JobID: "100",
})
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.JobNotFound{})
s.Require().IsType(err, &models.BaseError{})
s.Require().Nil(state)

state, err = s.store.GetExecutions(s.ctx, jobstore.GetExecutionsOptions{
Expand All @@ -557,7 +556,7 @@ func (s *BoltJobstoreTestSuite) TestGetExecutions() {
JobID: "1",
})
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.MultipleJobsFound{})
s.Require().IsType(err, &models.BaseError{})
s.Require().Nil(state)

// Created At Ascending Order Sort
Expand Down Expand Up @@ -684,7 +683,7 @@ func (s *BoltJobstoreTestSuite) TestShortIDs() {
// No matches
_, err := s.store.GetJob(s.ctx, shortString)
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.JobNotFound{})
s.Require().IsType(err, &models.BaseError{})

// Create and fetch the single entry
err = s.store.CreateJob(s.ctx, *job)
Expand All @@ -701,7 +700,7 @@ func (s *BoltJobstoreTestSuite) TestShortIDs() {

_, err = s.store.GetJob(s.ctx, shortString)
s.Require().Error(err)
s.Require().IsType(err, &bacerrors.MultipleJobsFound{})
s.Require().IsType(err, &models.BaseError{})
}

func (s *BoltJobstoreTestSuite) TestEvents() {
Expand Down
Loading
Loading