From b7edf0826a1f7dbb5071b500291de129f81bf32c Mon Sep 17 00:00:00 2001 From: udsamani Date: Sun, 8 Sep 2024 23:59:58 +0100 Subject: [PATCH 01/16] Introduce APIError for Sync Calls from CLI to Orchestrator Node --- Makefile | 2 +- cmd/cli/job/describe.go | 4 +- cmd/cli/job/executions.go | 3 +- cmd/cli/job/get.go | 3 +- cmd/cli/job/history.go | 3 +- cmd/util/download.go | 3 +- pkg/jobstore/boltdb/store.go | 7 +- pkg/publicapi/apimodels/error.go | 132 +++++++++++++++++++++ pkg/publicapi/client/v2/client.go | 23 +++- pkg/publicapi/client/v2/errors.go | 7 -- pkg/publicapi/endpoint/orchestrator/job.go | 2 + pkg/publicapi/middleware/error_handler.go | 48 ++++++++ pkg/publicapi/server.go | 4 + 13 files changed, 220 insertions(+), 21 deletions(-) create mode 100644 pkg/publicapi/apimodels/error.go create mode 100644 pkg/publicapi/middleware/error_handler.go diff --git a/Makefile b/Makefile index 501e57413e..b596b3b6cb 100644 --- a/Makefile +++ b/Makefile @@ -219,7 +219,7 @@ PKG_FILES := $(shell bash -c 'comm -23 <(git ls-files pkg | sort) <(git ls-files .PHONY: binary binary: ${CMD_FILES} ${PKG_FILES} main.go - ${GO} build -ldflags "${BUILD_FLAGS}" -trimpath -o ${BINARY_PATH} . + ${GO} build -gcflags=all="-N -l" -trimpath -o ${BINARY_PATH} . binary-web: build-webui ${WEB_GO_FILES} diff --git a/cmd/cli/job/describe.go b/cmd/cli/job/describe.go index c31cf1f363..d7b694f19a 100644 --- a/cmd/cli/job/describe.go +++ b/cmd/cli/job/describe.go @@ -2,6 +2,7 @@ package job import ( "cmp" + "errors" "fmt" "slices" "time" @@ -82,13 +83,14 @@ func NewDescribeCmd() *cobra.Command { func (o *DescribeOptions) run(cmd *cobra.Command, args []string, api client.API) error { ctx := cmd.Context() jobID := args[0] + response, err := api.Jobs().Get(ctx, &apimodels.GetJobRequest{ JobID: jobID, Include: "executions,history", }) if err != nil { - return fmt.Errorf("could not get job %s: %w", jobID, err) + return errors.New(err.Error()) } if o.OutputOpts.Format != "" { diff --git a/cmd/cli/job/executions.go b/cmd/cli/job/executions.go index 94b9d25483..23b5ff61ed 100644 --- a/cmd/cli/job/executions.go +++ b/cmd/cli/job/executions.go @@ -1,6 +1,7 @@ package job import ( + "errors" "fmt" "strconv" "time" @@ -147,7 +148,7 @@ func (o *ExecutionOptions) run(cmd *cobra.Command, args []string, api client.API }, }) if err != nil { - return err + return errors.New(err.Error()) } if err = output.Output(cmd, executionColumns, o.OutputOptions, response.Items); err != nil { diff --git a/cmd/cli/job/get.go b/cmd/cli/job/get.go index cf1f7a8c47..82db17306d 100644 --- a/cmd/cli/job/get.go +++ b/cmd/cli/job/get.go @@ -1,6 +1,7 @@ package job import ( + "errors" "fmt" "strings" @@ -108,7 +109,7 @@ func get(cmd *cobra.Command, cmdArgs []string, api client.API, cfg types.Bacalha jobID, OG.DownloadSettings, ); err != nil { - return fmt.Errorf("downloading job: %w", err) + return errors.New(err.Error()) } return nil diff --git a/cmd/cli/job/history.go b/cmd/cli/job/history.go index 2fd5da17ab..2cc2178a0d 100644 --- a/cmd/cli/job/history.go +++ b/cmd/cli/job/history.go @@ -1,6 +1,7 @@ package job import ( + "errors" "fmt" "time" @@ -160,7 +161,7 @@ func (o *HistoryOptions) run(cmd *cobra.Command, args []string, api client.API) }, }) if err != nil { - return err + return errors.New(err.Error()) } if err = output.Output(cmd, historyColumns, o.OutputOptions, response.Items); err != nil { diff --git a/cmd/util/download.go b/cmd/util/download.go index c1b1504f59..8fae04f6e8 100644 --- a/cmd/util/download.go +++ b/cmd/util/download.go @@ -3,6 +3,7 @@ package util import ( "context" "encoding/json" + "errors" "fmt" "os" "path/filepath" @@ -33,7 +34,7 @@ func DownloadResultsHandler( JobID: jobID, }) if err != nil { - Fatal(cmd, fmt.Errorf("could not get results for job %s: %w", jobID, err), 1) + return errors.New(err.Error()) } if len(response.Items) == 0 { diff --git a/pkg/jobstore/boltdb/store.go b/pkg/jobstore/boltdb/store.go index 18d6b3f297..6f281bb2a4 100644 --- a/pkg/jobstore/boltdb/store.go +++ b/pkg/jobstore/boltdb/store.go @@ -20,6 +20,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" "github.com/bacalhau-project/bacalhau/pkg/lib/math" "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" "github.com/bacalhau-project/bacalhau/pkg/util" "github.com/bacalhau-project/bacalhau/pkg/util/idgen" ) @@ -176,7 +177,7 @@ func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) { data := GetBucketData(tx, NewBucketPath(BucketJobs, jobID), SpecKey) if data == nil { - return job, bacerrors.NewJobNotFound(jobID) + return job, apimodels.NewJobNotFound(jobID) } err = b.marshaller.Unmarshal(data, &job) @@ -202,7 +203,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) { switch len(found) { case 0: - return "", bacerrors.NewJobNotFound(jobID) + return "", apimodels.NewJobNotFound(jobID) case 1: return found[0], nil default: @@ -909,7 +910,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error { job, err := b.getJob(tx, jobID) if err != nil { - return bacerrors.NewJobNotFound(jobID) + apimodels.NewJobNotFound(jobID) } tx.OnCommit(func() { diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go new file mode 100644 index 0000000000..250555d9f8 --- /dev/null +++ b/pkg/publicapi/apimodels/error.go @@ -0,0 +1,132 @@ +package apimodels + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" +) + +// apierror represents a standardized error response for the api. +// +// it encapsulates: +// - an http status code +// - a human-readable error message +// - optional detailed error information +// +// purpose: +// - primarily used to send synchronous errors from the orchestrator endpoint +// - provides a structured json error response to http clients +// +// usage: +// - when the cli interacts with an orchestrator node via an http client: +// 1. the http client receives this structured json error +// 2. the human-readable message is output to stdout +// 3. the http status code allows for programmatic handling of different error types +// +// benefits: +// - ensures consistent error formatting across the api +// - facilitates both user-friendly error messages and machine-readable error codes +type APIError struct { + // httpstatuscode is the http status code associated with this error. + // it should correspond to standard http status codes (e.g., 400, 404, 500). + HTTPStatusCode int `json:"code"` + + // message is a short, human-readable description of the error. + // it should be concise and provide a clear indication of what went wrong. + Message string `json:"message"` +} + +// NewAPIError creates a new APIError with the given HTTP status code and message. +func NewAPIError(statusCode int, message string) *APIError { + return &APIError{ + HTTPStatusCode: statusCode, + Message: message, + } +} + +// NewBadRequestError creates an APIError for Bad Request (400) errors. +func NewBadRequestError(message string) *APIError { + return NewAPIError(http.StatusBadRequest, message) +} + +// NewUnauthorizedError creates an APIError for Unauthorized (401) errors. +func NewUnauthorizedError(message string) *APIError { + return NewAPIError(http.StatusUnauthorized, message) +} + +// NewForbiddenError creates an APIError for Forbidden (403) errors. +func NewForbiddenError(message string) *APIError { + return NewAPIError(http.StatusForbidden, message) +} + +// NewNotFoundError creates an APIError for Not Found (404) errors. +func NewNotFoundError(message string) *APIError { + return NewAPIError(http.StatusNotFound, message) +} + +// NewConflictError creates an APIError for Conflict (409) errors. +func NewConflictError(message string) *APIError { + return NewAPIError(http.StatusConflict, message) +} + +// NewInternalServerError creates an APIError for Internal Server Error (500) errors. +func NewInternalServerError(message string) *APIError { + return NewAPIError(http.StatusInternalServerError, message) +} + +func NewJobNotFound(jobID string) *APIError { + return NewAPIError(http.StatusNotFound, fmt.Sprintf("job id %s not found", jobID)) +} + +// IsNotFound checks if the error is an APIError with a Not Found status. +func IsNotFound(err error) bool { + apiErr, ok := err.(*APIError) + return ok && apiErr.HTTPStatusCode == http.StatusNotFound +} + +// IsBadRequest checks if the error is an APIError with a Bad Request status. +func IsBadRequest(err error) bool { + apiErr, ok := err.(*APIError) + return ok && apiErr.HTTPStatusCode == http.StatusBadRequest +} + +// IsInternalServerError checks if the error is an APIError with an Internal Server Error status. +func IsInternalServerError(err error) bool { + apiErr, ok := err.(*APIError) + return ok && apiErr.HTTPStatusCode == http.StatusInternalServerError +} + +// Error implements the error interface, allowing APIError to be used as a standard Go error. +func (e *APIError) Error() string { + return e.Message +} + +// Parse HTTP Resposne to APIError +func FromHttpResponse(resp *http.Response) (*APIError, error) { + + if resp == nil { + return nil, errors.New("response is nil, cannot be unmarsheld to APIError") + } + + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %w") + } + + var apiErr APIError + err = json.Unmarshal(body, &apiErr) + if err != nil { + return nil, fmt.Errorf("error parsing response body: %w", err) + } + + // If the JSON didn't include a status code, use the HTTP Status + if apiErr.HTTPStatusCode == 0 { + apiErr.HTTPStatusCode = resp.StatusCode + } + + return &apiErr, nil +} diff --git a/pkg/publicapi/client/v2/client.go b/pkg/publicapi/client/v2/client.go index 56c0fe43c1..db68b835ad 100644 --- a/pkg/publicapi/client/v2/client.go +++ b/pkg/publicapi/client/v2/client.go @@ -55,12 +55,25 @@ type httpClient struct { // and deserialize the response into a response object func (c *httpClient) Get(ctx context.Context, endpoint string, in apimodels.GetRequest, out apimodels.GetResponse) error { r := in.ToHTTPRequest() - _, resp, err := requireOK(c.doRequest(ctx, http.MethodGet, endpoint, r)) //nolint:bodyclose // this is being closed - if err != nil && resp != nil && resp.StatusCode == http.StatusUnauthorized { - return apimodels.ErrInvalidToken - } else if err != nil { - return err + + _, resp, err := c.doRequest(ctx, http.MethodGet, endpoint, r) //nolint:bodyclose // this is being closed + + if resp.StatusCode == http.StatusUnauthorized { + return apimodels.NewUnauthorizedError("invalid token") } + + var apiError *apimodels.APIError + if resp.StatusCode != http.StatusOK { + apiError, err = apimodels.FromHttpResponse(resp) + if err != nil { + return err + } + } + + if apiError != nil { + return apiError + } + defer resp.Body.Close() if out != nil { diff --git a/pkg/publicapi/client/v2/errors.go b/pkg/publicapi/client/v2/errors.go index 529555f588..54d1dee929 100644 --- a/pkg/publicapi/client/v2/errors.go +++ b/pkg/publicapi/client/v2/errors.go @@ -165,13 +165,6 @@ func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *h // response codes and validates that the received response code is among them func requireStatusIn(statuses ...int) doRequestWrapper { return func(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) { - if e != nil { - if resp != nil { - _ = resp.Body.Close() - } - return d, nil, e - } - for _, status := range statuses { if resp.StatusCode == status { return d, resp, nil diff --git a/pkg/publicapi/endpoint/orchestrator/job.go b/pkg/publicapi/endpoint/orchestrator/job.go index ba362f858a..ec4df820f9 100644 --- a/pkg/publicapi/endpoint/orchestrator/job.go +++ b/pkg/publicapi/endpoint/orchestrator/job.go @@ -76,8 +76,10 @@ func (e *Endpoint) getJob(c echo.Context) error { //nolint: gocyclo if err := c.Bind(&args); err != nil { return echo.NewHTTPError(http.StatusBadRequest, err.Error()) } + log.Info().Msg("UDIT SUCKS") job, err := e.store.GetJob(ctx, jobID) if err != nil { + log.Error().Err(err) return err } response := apimodels.GetJobResponse{ diff --git a/pkg/publicapi/middleware/error_handler.go b/pkg/publicapi/middleware/error_handler.go new file mode 100644 index 0000000000..0031ea5e64 --- /dev/null +++ b/pkg/publicapi/middleware/error_handler.go @@ -0,0 +1,48 @@ +package middleware + +import ( + "net/http" + + "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" + "github.com/labstack/echo/v4" + "github.com/rs/zerolog/log" +) + +func CustomHTTPErrorHandler(err error, c echo.Context) { + + log.Info().Msg("HELLO THIS IS CALLED") + var code int + var message string + + switch e := err.(type) { + + case *apimodels.APIError: + // If it is already our custom APIError, use its code and message + code = e.HTTPStatusCode + message = e.Message + + default: + // In an ideal world this should never happen. We should always have are errors + // from server as APIError. If output is this generic string, one should evaluate + // and map it to APIError and send in appropriate message.= http.StatusInternalServerError + message = "internal server error" + } + + // Don't override the status code if it is already been set. + // This is something that is advised by ECHO framework. + if !c.Response().Committed { + + if c.Request().Method == http.MethodHead { + err = c.NoContent(code) + } else { + err = c.JSON(code, apimodels.APIError{ + HTTPStatusCode: code, + Message: message, + }) + } + if err != nil { + log.Info().Msg("unable to send json response while handling error.") + } + } + +} diff --git a/pkg/publicapi/server.go b/pkg/publicapi/server.go index e4ebcc4fdd..9178f590fe 100644 --- a/pkg/publicapi/server.go +++ b/pkg/publicapi/server.go @@ -140,6 +140,10 @@ func NewAPIServer(params ServerParams) (*Server, error) { middleware.VersionNotifyLogger(middlewareLogger, *serverVersion), ) + // Add custom http error handler. This is a centralized error handler for + // the server + server.Router.HTTPErrorHandler = middleware.CustomHTTPErrorHandler + var tlsConfig *tls.Config if params.AutoCertDomain != "" { log.Ctx(context.TODO()).Debug().Msgf("Setting up auto-cert for %s", params.AutoCertDomain) From c4e9ac8aa5a670952bfea809894fdcc2b274c0fa Mon Sep 17 00:00:00 2001 From: udsamani Date: Mon, 9 Sep 2024 00:09:12 +0100 Subject: [PATCH 02/16] Revert Makefile --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index b596b3b6cb..501e57413e 100644 --- a/Makefile +++ b/Makefile @@ -219,7 +219,7 @@ PKG_FILES := $(shell bash -c 'comm -23 <(git ls-files pkg | sort) <(git ls-files .PHONY: binary binary: ${CMD_FILES} ${PKG_FILES} main.go - ${GO} build -gcflags=all="-N -l" -trimpath -o ${BINARY_PATH} . + ${GO} build -ldflags "${BUILD_FLAGS}" -trimpath -o ${BINARY_PATH} . binary-web: build-webui ${WEB_GO_FILES} From 62f9a2d04a6b8f5a967c6287734b723e0d898f67 Mon Sep 17 00:00:00 2001 From: udsamani Date: Mon, 9 Sep 2024 00:12:37 +0100 Subject: [PATCH 03/16] fix comment --- pkg/publicapi/apimodels/error.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go index 250555d9f8..d09d6b77c4 100644 --- a/pkg/publicapi/apimodels/error.go +++ b/pkg/publicapi/apimodels/error.go @@ -13,7 +13,6 @@ import ( // it encapsulates: // - an http status code // - a human-readable error message -// - optional detailed error information // // purpose: // - primarily used to send synchronous errors from the orchestrator endpoint From 0f0fe075c085e26592e21e68e0d2c823962ea978 Mon Sep 17 00:00:00 2001 From: udsamani Date: Mon, 9 Sep 2024 00:14:21 +0100 Subject: [PATCH 04/16] Remove redundant log --- pkg/publicapi/middleware/error_handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/publicapi/middleware/error_handler.go b/pkg/publicapi/middleware/error_handler.go index 0031ea5e64..40cb069bef 100644 --- a/pkg/publicapi/middleware/error_handler.go +++ b/pkg/publicapi/middleware/error_handler.go @@ -10,7 +10,6 @@ import ( func CustomHTTPErrorHandler(err error, c echo.Context) { - log.Info().Msg("HELLO THIS IS CALLED") var code int var message string From 7a2288318f80f5b65ac70e1fc60d62dc6544bafa Mon Sep 17 00:00:00 2001 From: udsamani Date: Mon, 9 Sep 2024 00:29:49 +0100 Subject: [PATCH 05/16] Add basic test --- pkg/publicapi/test/job_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pkg/publicapi/test/job_test.go b/pkg/publicapi/test/job_test.go index 659af57b5a..cb4c969253 100644 --- a/pkg/publicapi/test/job_test.go +++ b/pkg/publicapi/test/job_test.go @@ -4,6 +4,8 @@ package test import ( "context" + "errors" + "net/http" "time" "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" @@ -81,3 +83,18 @@ func (s *ServerSuite) TestJobOperations() { _, err = s.client.Jobs().Stop(ctx, &apimodels.StopJobRequest{JobID: putResponse.JobID}) s.Require().Error(err) } + +func (s *ServerSuite) TestJobOperationsHttpHandler() { + + ctx := context.Background() + job := mock.Job() + + // retrieve a non existent job + _, err := s.client.Jobs().Get(ctx, &apimodels.GetJobRequest{JobID: job.ID}) + s.Require().Error(err) + + var apiError *apimodels.APIError + s.Require().True(errors.As(err, &apiError)) + s.Require().Equal(http.StatusNotFound, apiError.HTTPStatusCode, "Expected 404 Not Found status") + s.Require().Contains(apiError.Message, "not found", "Error message should indicate resource not found") +} From f5dc73064a09165475f41ea85a82abdb774d07ad Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 00:46:26 +0100 Subject: [PATCH 06/16] job store errors --- cmd/cli/job/describe.go | 3 +- cmd/cli/job/get.go | 3 +- pkg/jobstore/boltdb/store.go | 14 +- pkg/jobstore/errors.go | 157 +++++---------------- pkg/models/error.go | 38 +++++ pkg/publicapi/endpoint/orchestrator/job.go | 1 - pkg/publicapi/middleware/error_handler.go | 7 +- pkg/publicapi/test/job_test.go | 15 -- 8 files changed, 89 insertions(+), 149 deletions(-) diff --git a/cmd/cli/job/describe.go b/cmd/cli/job/describe.go index d7b694f19a..0a45ce7c9b 100644 --- a/cmd/cli/job/describe.go +++ b/cmd/cli/job/describe.go @@ -2,7 +2,6 @@ package job import ( "cmp" - "errors" "fmt" "slices" "time" @@ -90,7 +89,7 @@ func (o *DescribeOptions) run(cmd *cobra.Command, args []string, api client.API) }) if err != nil { - return errors.New(err.Error()) + return err } if o.OutputOpts.Format != "" { diff --git a/cmd/cli/job/get.go b/cmd/cli/job/get.go index 82db17306d..6754e18349 100644 --- a/cmd/cli/job/get.go +++ b/cmd/cli/job/get.go @@ -1,7 +1,6 @@ package job import ( - "errors" "fmt" "strings" @@ -109,7 +108,7 @@ func get(cmd *cobra.Command, cmdArgs []string, api client.API, cfg types.Bacalha jobID, OG.DownloadSettings, ); err != nil { - return errors.New(err.Error()) + return err } return nil diff --git a/pkg/jobstore/boltdb/store.go b/pkg/jobstore/boltdb/store.go index 6f281bb2a4..e44e5b65b9 100644 --- a/pkg/jobstore/boltdb/store.go +++ b/pkg/jobstore/boltdb/store.go @@ -190,7 +190,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) { if idgen.ShortUUID(jobID) == jobID { bktJobs, err := NewBucketPath(BucketJobs).Get(tx, false) if err != nil { - return "", err + return "", jobstore.NewBoltDbError(err.Error()) } found := make([]string, 0, 1) @@ -203,7 +203,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) { switch len(found) { case 0: - return "", apimodels.NewJobNotFound(jobID) + return "", jobstore.NewErrJobNotFound(jobID) case 1: return found[0], nil default: @@ -224,7 +224,7 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e } if bkt, err := NewBucketPath(BucketJobs, key, BucketJobExecutions).Get(tx, false); err != nil { - return exec, err + return exec, jobstore.NewBoltDbError(err.Error()) } else { data := bkt.Get([]byte(id)) if data == nil { @@ -243,11 +243,11 @@ func (b *BoltJobStore) getExecution(tx *bolt.Tx, id string) (models.Execution, e func (b *BoltJobStore) getExecutionJobID(tx *bolt.Tx, id string) (string, error) { keys, err := b.executionsIndex.List(tx, []byte(id)) if err != nil { - return "", err + return "", jobstore.NewBoltDbError(err.Error()) } if len(keys) != 1 { - return "", fmt.Errorf("too many leaf nodes in execution index") + return "", jobstore.NewJobStoreError("too many leaf nodes in execution index") } return string(keys[0]), nil @@ -298,7 +298,7 @@ func (b *BoltJobStore) getExecutions(tx *bolt.Tx, options jobstore.GetExecutions bkt, err := NewBucketPath(BucketJobs, jobID, BucketJobExecutions).Get(tx, false) if err != nil { - return nil, err + return nil, jobstore.NewBoltDbError(err.Error()) } var execs []models.Execution @@ -422,7 +422,7 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) ( if query.ReturnAll || query.Namespace == "" { bkt, err := NewBucketPath(BucketJobs).Get(tx, false) if err != nil { - return nil, err + return nil, jobstore.NewBoltDbError(err.Error()) } err = bkt.ForEachBucket(func(k []byte) error { diff --git a/pkg/jobstore/errors.go b/pkg/jobstore/errors.go index 321d473844..622885a50b 100644 --- a/pkg/jobstore/errors.go +++ b/pkg/jobstore/errors.go @@ -6,154 +6,73 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/models" ) +const JOB_STORE_COMPONENT = "JBS" + // ErrJobNotFound is returned when the job is not found type ErrJobNotFound struct { JobID string } -func NewErrJobNotFound(id string) ErrJobNotFound { - return ErrJobNotFound{JobID: id} +func NewErrJobNotFound(id string) *models.BaseError { + return models.NewBaseError("job not found: %s", id).WithCode(models.NewErrorCode(JOB_STORE_COMPONENT, 404)) } -func (e ErrJobNotFound) Error() string { - return "job not found: " + e.JobID +func NewErrJobAlreadyExists(id string) *models.BaseError { + return models.NewBaseError("job already exists: %s", id) } -// ErrJobAlreadyExists is returned when an job already exists -type ErrJobAlreadyExists struct { - JobID string -} - -func NewErrJobAlreadyExists(id string) ErrJobAlreadyExists { - return ErrJobAlreadyExists{JobID: id} -} - -func (e ErrJobAlreadyExists) Error() string { - return "job already exists: " + e.JobID -} - -// ErrInvalidJobState is returned when an job is in an invalid state. -type ErrInvalidJobState struct { - JobID string - Actual models.JobStateType - Expected models.JobStateType -} - -func NewErrInvalidJobState(id string, actual models.JobStateType, expected models.JobStateType) ErrInvalidJobState { - return ErrInvalidJobState{JobID: id, Actual: actual, Expected: expected} -} - -func (e ErrInvalidJobState) Error() string { - if e.Expected.IsUndefined() { - return fmt.Sprintf("job %s is in unexpected state %s", e.JobID, e.Actual) +func NewErrInvalidJobState(id string, actual models.JobStateType, expected models.JobStateType) *models.BaseError { + var errorFormat string + if expected.IsUndefined() { + errorFormat = "job %s is in unexpected state %s" + } else { + errorFormat = "job %s is in state %s but expected %s" } - return fmt.Sprintf("job %s is in state %s but expected %s", e.JobID, e.Actual, e.Expected) -} - -// ErrInvalidJobVersion is returned when an job has an invalid version. -type ErrInvalidJobVersion struct { - JobID string - Actual uint64 - Expected uint64 -} - -func NewErrInvalidJobVersion(id string, actual, expected uint64) ErrInvalidJobVersion { - return ErrInvalidJobVersion{JobID: id, Actual: actual, Expected: expected} -} -func (e ErrInvalidJobVersion) Error() string { - return fmt.Sprintf("job %s has version %d but expected %d", e.JobID, e.Actual, e.Expected) + return models.NewBaseError(errorFormat, id, actual) } -// ErrJobAlreadyTerminal is returned when an job is already in terminal state and cannot be updated. -type ErrJobAlreadyTerminal struct { - JobID string - Actual models.JobStateType - NewState models.JobStateType +func NewErrInvalidJobVersion(id string, actual, expected uint64) *models.BaseError { + errorMessage := fmt.Sprintf("job %s has version %d but expected %d", id, actual, expected) + return models.NewBaseError(errorMessage) } -func NewErrJobAlreadyTerminal(id string, actual models.JobStateType, newState models.JobStateType) ErrJobAlreadyTerminal { - return ErrJobAlreadyTerminal{JobID: id, Actual: actual, NewState: newState} +func NewErrJobAlreadyTerminal(id string, actual models.JobStateType, newState models.JobStateType) *models.BaseError { + errorMessage := fmt.Sprintf("job %s is in terminal state %s and connt transition to %s", id, actual, newState) + return models.NewBaseError(errorMessage) } -func (e ErrJobAlreadyTerminal) Error() string { - return fmt.Sprintf("job %s is in terminal state %s and cannot transition to %s", - e.JobID, e.Actual, e.NewState) +func NewErrExecutionNotFound(id string) *models.BaseError { + return models.NewBaseError("execution not found: %s", id) } -// ErrExecutionNotFound is returned when an job already exists -type ErrExecutionNotFound struct { - ExecutionID string +func NewErrExecutionAlreadyExists(id string) *models.BaseError { + return models.NewBaseError("execution already exists %s", id) } -func NewErrExecutionNotFound(id string) ErrExecutionNotFound { - return ErrExecutionNotFound{ExecutionID: id} -} - -func (e ErrExecutionNotFound) Error() string { - return "execution not found: " + e.ExecutionID -} - -// ErrExecutionAlreadyExists is returned when an job already exists -type ErrExecutionAlreadyExists struct { - ExecutionID string -} - -func NewErrExecutionAlreadyExists(id string) ErrExecutionAlreadyExists { - return ErrExecutionAlreadyExists{ExecutionID: id} -} - -func (e ErrExecutionAlreadyExists) Error() string { - return "execution already exists: " + e.ExecutionID -} - -// ErrInvalidExecutionState is returned when an execution is in an invalid state. -type ErrInvalidExecutionState struct { - ExecutionID string - Actual models.ExecutionStateType - Expected []models.ExecutionStateType -} - -func NewErrInvalidExecutionState( - id string, actual models.ExecutionStateType, expected ...models.ExecutionStateType) ErrInvalidExecutionState { - return ErrInvalidExecutionState{ExecutionID: id, Actual: actual, Expected: expected} -} - -func (e ErrInvalidExecutionState) Error() string { - if len(e.Expected) > 0 { - return fmt.Sprintf("execution %s is in unexpected state %s", e.ExecutionID, e.Actual) +func NewErrInvalidExecutionState(id string, actual models.ExecutionStateType, expected ...models.ExecutionStateType) *models.BaseError { + var errorMessage string + if len(expected) > 0 { + errorMessage = fmt.Sprintf("execution %s is in unexpected state %s", id, actual) + } else { + errorMessage = fmt.Sprintf("execution %s is in state %s, but expected %s", id, actual, expected) } - return fmt.Sprintf("execution %s is in state %s, but expected %s", e.ExecutionID, e.Actual, e.Expected) -} - -// ErrInvalidExecutionVersion is returned when an execution has an invalid version. -type ErrInvalidExecutionVersion struct { - ExecutionID string - Actual uint64 - Expected uint64 + return models.NewBaseError(errorMessage) } -func NewErrInvalidExecutionVersion(id string, actual, expected uint64) ErrInvalidExecutionVersion { - return ErrInvalidExecutionVersion{ExecutionID: id, Actual: actual, Expected: expected} -} +func NewErrInvalidExecutionVersion(id string, actual, expected uint64) *models.BaseError { + return models.NewBaseError("execution %s has version %d but expected %d", id, actual, expected) -func (e ErrInvalidExecutionVersion) Error() string { - return fmt.Sprintf("execution %s has version %d but expected %d", e.ExecutionID, e.Actual, e.Expected) } -// ErrExecutionAlreadyTerminal is returned when an execution is already in terminal state and cannot be updated. -type ErrExecutionAlreadyTerminal struct { - ExecutionID string - Actual models.ExecutionStateType - NewState models.ExecutionStateType +func NewErrExecutionAlreadyTerminal(id string, actual models.ExecutionStateType, newState models.ExecutionStateType) *models.BaseError { + return models.NewBaseError("execution %s is in terminal state %s and cannot transition to %s", id, actual, newState) } -func NewErrExecutionAlreadyTerminal( - id string, actual models.ExecutionStateType, newState models.ExecutionStateType) ErrExecutionAlreadyTerminal { - return ErrExecutionAlreadyTerminal{ExecutionID: id, Actual: actual, NewState: newState} +func NewBoltDbError(message string) *models.BaseError { + return models.NewBaseError(message) } -func (e ErrExecutionAlreadyTerminal) Error() string { - return fmt.Sprintf("execution %s is in terminal state %s and cannot transition to %s", - e.ExecutionID, e.Actual, e.NewState) +func NewJobStoreError(message string) *models.BaseError { + return models.NewBaseError(message) } diff --git a/pkg/models/error.go b/pkg/models/error.go index a4289c0b5e..737ea57407 100644 --- a/pkg/models/error.go +++ b/pkg/models/error.go @@ -41,6 +41,31 @@ type HasDetails interface { Details() map[string]string } +type HasCode interface { + // Details a code + Code() int +} + +type ErrorCode struct { + component string + httpStatusCode int +} + +func NewErrorCode(component string, httpStatusCode int) ErrorCode { + return ErrorCode{component: component, httpStatusCode: httpStatusCode} +} + +func (ec ErrorCode) HTTPStatusCode() int { + if ec.httpStatusCode == 0 { + return 500 + } + return ec.httpStatusCode +} + +func (ec ErrorCode) Component() string { + return ec.component +} + // BaseError is a custom error type in Go that provides additional fields // and methods for more detailed error handling. It implements the error // interface, as well as additional interfaces for providing a hint, @@ -52,6 +77,7 @@ type BaseError struct { retryable bool failsExecution bool details map[string]string + code ErrorCode } // NewBaseError is a constructor function that creates a new BaseError with @@ -88,6 +114,13 @@ func (e *BaseError) WithDetails(details map[string]string) *BaseError { return e } +// WithCode is a method that sets the code field of BaseError and +// returns the BaseError itself for chaining +func (e *BaseError) WithCode(code ErrorCode) *BaseError { + e.code = code + return e +} + // Error is a method that returns the message field of BaseError. This // method makes BaseError satisfy the error interface. func (e *BaseError) Error() string { @@ -113,3 +146,8 @@ func (e *BaseError) FailsExecution() bool { func (e *BaseError) Details() map[string]string { return e.details } + +// Details a Unique Code to identify the error +func (e *BaseError) Code() ErrorCode { + return e.code +} diff --git a/pkg/publicapi/endpoint/orchestrator/job.go b/pkg/publicapi/endpoint/orchestrator/job.go index ec4df820f9..77463edd44 100644 --- a/pkg/publicapi/endpoint/orchestrator/job.go +++ b/pkg/publicapi/endpoint/orchestrator/job.go @@ -76,7 +76,6 @@ func (e *Endpoint) getJob(c echo.Context) error { //nolint: gocyclo if err := c.Bind(&args); err != nil { return echo.NewHTTPError(http.StatusBadRequest, err.Error()) } - log.Info().Msg("UDIT SUCKS") job, err := e.store.GetJob(ctx, jobID) if err != nil { log.Error().Err(err) diff --git a/pkg/publicapi/middleware/error_handler.go b/pkg/publicapi/middleware/error_handler.go index 40cb069bef..434b9545a7 100644 --- a/pkg/publicapi/middleware/error_handler.go +++ b/pkg/publicapi/middleware/error_handler.go @@ -3,6 +3,7 @@ package middleware import ( "net/http" + "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" "github.com/labstack/echo/v4" "github.com/rs/zerolog/log" @@ -15,10 +16,10 @@ func CustomHTTPErrorHandler(err error, c echo.Context) { switch e := err.(type) { - case *apimodels.APIError: + case *models.BaseError: // If it is already our custom APIError, use its code and message - code = e.HTTPStatusCode - message = e.Message + code = e.Code().HTTPStatusCode() + message = e.Error() default: // In an ideal world this should never happen. We should always have are errors diff --git a/pkg/publicapi/test/job_test.go b/pkg/publicapi/test/job_test.go index cb4c969253..f36e93f172 100644 --- a/pkg/publicapi/test/job_test.go +++ b/pkg/publicapi/test/job_test.go @@ -83,18 +83,3 @@ func (s *ServerSuite) TestJobOperations() { _, err = s.client.Jobs().Stop(ctx, &apimodels.StopJobRequest{JobID: putResponse.JobID}) s.Require().Error(err) } - -func (s *ServerSuite) TestJobOperationsHttpHandler() { - - ctx := context.Background() - job := mock.Job() - - // retrieve a non existent job - _, err := s.client.Jobs().Get(ctx, &apimodels.GetJobRequest{JobID: job.ID}) - s.Require().Error(err) - - var apiError *apimodels.APIError - s.Require().True(errors.As(err, &apiError)) - s.Require().Equal(http.StatusNotFound, apiError.HTTPStatusCode, "Expected 404 Not Found status") - s.Require().Contains(apiError.Message, "not found", "Error message should indicate resource not found") -} From af77b2f1d76cb2dc3e42c93913bb28ed1fbe9e2e Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 00:47:06 +0100 Subject: [PATCH 07/16] fix unused imports --- pkg/publicapi/test/job_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/publicapi/test/job_test.go b/pkg/publicapi/test/job_test.go index f36e93f172..659af57b5a 100644 --- a/pkg/publicapi/test/job_test.go +++ b/pkg/publicapi/test/job_test.go @@ -4,8 +4,6 @@ package test import ( "context" - "errors" - "net/http" "time" "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" From 3548a73f97f156b1553476b4f58467749e03462d Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 01:09:08 +0100 Subject: [PATCH 08/16] add request id to api error --- pkg/models/error.go | 7 +++++++ pkg/publicapi/apimodels/error.go | 3 +++ pkg/publicapi/middleware/error_handler.go | 3 +++ 3 files changed, 13 insertions(+) diff --git a/pkg/models/error.go b/pkg/models/error.go index 737ea57407..90db5cf7b0 100644 --- a/pkg/models/error.go +++ b/pkg/models/error.go @@ -66,6 +66,13 @@ func (ec ErrorCode) Component() string { return ec.component } +func (ec ErrorCode) String() string { + if ec.httpStatusCode == 0 { + return fmt.Sprintf("%s-500", ec.component) + } + return fmt.Sprintf("%s-%d", ec.component, ec.httpStatusCode) +} + // BaseError is a custom error type in Go that provides additional fields // and methods for more detailed error handling. It implements the error // interface, as well as additional interfaces for providing a hint, diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go index d09d6b77c4..168e8dc17c 100644 --- a/pkg/publicapi/apimodels/error.go +++ b/pkg/publicapi/apimodels/error.go @@ -35,6 +35,9 @@ type APIError struct { // message is a short, human-readable description of the error. // it should be concise and provide a clear indication of what went wrong. Message string `json:"message"` + + // RequestID is the request ID of the request that caused the error. + RequestID string `json:"request_id"` } // NewAPIError creates a new APIError with the given HTTP status code and message. diff --git a/pkg/publicapi/middleware/error_handler.go b/pkg/publicapi/middleware/error_handler.go index 434b9545a7..7b454d8a3d 100644 --- a/pkg/publicapi/middleware/error_handler.go +++ b/pkg/publicapi/middleware/error_handler.go @@ -28,6 +28,8 @@ func CustomHTTPErrorHandler(err error, c echo.Context) { message = "internal server error" } + requestID := c.Request().Header.Get(echo.HeaderXRequestID) + // Don't override the status code if it is already been set. // This is something that is advised by ECHO framework. if !c.Response().Committed { @@ -38,6 +40,7 @@ func CustomHTTPErrorHandler(err error, c echo.Context) { err = c.JSON(code, apimodels.APIError{ HTTPStatusCode: code, Message: message, + RequestID: requestID, }) } if err != nil { From 3336679ae94f5c410c9f540011669369edd74843 Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 01:26:59 +0100 Subject: [PATCH 09/16] fix jobstore further --- pkg/jobstore/boltdb/store.go | 33 +++++++++++++++++---------------- pkg/jobstore/errors.go | 16 +++++++++------- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/pkg/jobstore/boltdb/store.go b/pkg/jobstore/boltdb/store.go index e44e5b65b9..183c8ab2ba 100644 --- a/pkg/jobstore/boltdb/store.go +++ b/pkg/jobstore/boltdb/store.go @@ -168,6 +168,7 @@ func (b *BoltJobStore) GetJob(ctx context.Context, id string) (models.Job, error } func (b *BoltJobStore) getJob(tx *bolt.Tx, jobID string) (models.Job, error) { + var job models.Job jobID, err := b.reifyJobID(tx, jobID) @@ -207,7 +208,7 @@ func (b *BoltJobStore) reifyJobID(tx *bolt.Tx, jobID string) (string, error) { case 1: return found[0], nil default: - return "", bacerrors.NewMultipleJobsFound(jobID, found) + return "", jobstore.NewErrMultipleJobsFound(jobID) } } @@ -430,12 +431,12 @@ func (b *BoltJobStore) getJobsInitialSet(tx *bolt.Tx, query jobstore.JobQuery) ( return nil }) if err != nil { - return nil, err + return nil, jobstore.NewBoltDbError(err.Error()) } } else { ids, err := b.namespacesIndex.List(tx, []byte(query.Namespace)) if err != nil { - return nil, err + return nil, jobstore.NewBoltDbError(err.Error()) } for _, k := range ids { @@ -456,7 +457,7 @@ func (b *BoltJobStore) getJobsIncludeTags(tx *bolt.Tx, jobSet map[string]struct{ tagLabel := []byte(strings.ToLower(tag)) ids, err := b.tagsIndex.List(tx, tagLabel) if err != nil { - return nil, err + return nil, jobstore.NewBoltDbError(err.Error()) } for _, k := range ids { @@ -484,7 +485,7 @@ func (b *BoltJobStore) getJobsExcludeTags(tx *bolt.Tx, jobSet map[string]struct{ tagLabel := []byte(strings.ToLower(tag)) ids, err := b.tagsIndex.List(tx, tagLabel) if err != nil { - return nil, err + return nil, jobstore.NewBoltDbError(err.Error()) } for _, k := range ids { @@ -585,7 +586,7 @@ func (b *BoltJobStore) getInProgressJobs(tx *bolt.Tx, jobType string) ([]models. keys, err := b.inProgressIndex.List(tx) if err != nil { - return nil, err + return nil, jobstore.NewBoltDbError(err.Error()) } for _, jobIDKey := range keys { @@ -778,7 +779,7 @@ func (b *BoltJobStore) update(ctx context.Context, update func(tx *bolt.Tx) erro tx, externalTx = txFromContext(ctx) if externalTx { if !tx.Writable() { - return fmt.Errorf("readonly transaction provided in context for update operation") + return jobstore.NewBoltDbError("readonly transaction provided in context for update operation") } } else { tx, err = b.database.Begin(true) @@ -818,7 +819,7 @@ func (b *BoltJobStore) view(ctx context.Context, view func(tx *bolt.Tx) error) e if !externalTx { tx, err = b.database.Begin(false) if err != nil { - return err + return jobstore.NewBoltDbError(err.Error()) } } @@ -845,21 +846,21 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error { jobIDKey := []byte(job.ID) if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, true); err != nil { - return err + return jobstore.NewBoltDbError(err.Error()) } else { // Create the evaluations and executions buckets and so forth if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobExecutions)); err != nil { return err } if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobEvaluations)); err != nil { - return err + return jobstore.NewBoltDbError(err.Error()) } if _, err := bkt.CreateBucketIfNotExists([]byte(BucketJobHistory)); err != nil { - return err + return jobstore.NewBoltDbError(err.Error()) } if _, err := bkt.CreateBucketIfNotExists([]byte(BucketExecutionHistory)); err != nil { - return err + return jobstore.NewBoltDbError(err.Error()) } } @@ -870,7 +871,7 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error { } if bkt, err := NewBucketPath(BucketJobs, job.ID).Get(tx, false); err != nil { - return err + return jobstore.NewBoltDbError(err.Error()) } else { if err = bkt.Put(SpecKey, jobData); err != nil { return err @@ -880,11 +881,11 @@ func (b *BoltJobStore) createJob(tx *bolt.Tx, job models.Job) error { // Create a composite key for the in progress index jobkey := createInProgressIndexKey(&job) if err = b.inProgressIndex.Add(tx, []byte(jobkey)); err != nil { - return err + return jobstore.NewBoltDbError(err.Error()) } if err = b.namespacesIndex.Add(tx, jobIDKey, []byte(job.Namespace)); err != nil { - return err + return jobstore.NewBoltDbError(err.Error()) } // Write sentinels keys for specific tags @@ -919,7 +920,7 @@ func (b *BoltJobStore) deleteJob(tx *bolt.Tx, jobID string) error { // Delete the Job bucket (and everything within it) if bkt, err := NewBucketPath(BucketJobs).Get(tx, false); err != nil { - return err + return jobstore.NewBoltDbError(err.Error()) } else { if err = bkt.DeleteBucket([]byte(jobID)); err != nil { return err diff --git a/pkg/jobstore/errors.go b/pkg/jobstore/errors.go index 622885a50b..70aea00cd7 100644 --- a/pkg/jobstore/errors.go +++ b/pkg/jobstore/errors.go @@ -6,17 +6,19 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/models" ) -const JOB_STORE_COMPONENT = "JBS" - -// ErrJobNotFound is returned when the job is not found -type ErrJobNotFound struct { - JobID string -} +const ( + JOB_STORE_COMPONENT = "JBS" + BOLTDB_COMPONENT = "BDB" +) func NewErrJobNotFound(id string) *models.BaseError { return models.NewBaseError("job not found: %s", id).WithCode(models.NewErrorCode(JOB_STORE_COMPONENT, 404)) } +func NewErrMultipleJobsFound(id string) *models.BaseError { + return models.NewBaseError("multiple jobs found for id %s", id).WithCode(models.NewErrorCode(JOB_STORE_COMPONENT, 400)) +} + func NewErrJobAlreadyExists(id string) *models.BaseError { return models.NewBaseError("job already exists: %s", id) } @@ -70,7 +72,7 @@ func NewErrExecutionAlreadyTerminal(id string, actual models.ExecutionStateType, } func NewBoltDbError(message string) *models.BaseError { - return models.NewBaseError(message) + return models.NewBaseError(message).WithCode(models.NewErrorCode(BOLTDB_COMPONENT, 500)) } func NewJobStoreError(message string) *models.BaseError { From 6b8d0b31e5da77b9655e377f34a257384fe99381 Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 01:37:25 +0100 Subject: [PATCH 10/16] fix jobstore tests --- pkg/jobstore/boltdb/store_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/jobstore/boltdb/store_test.go b/pkg/jobstore/boltdb/store_test.go index 2fa13cb94f..77b14f701e 100644 --- a/pkg/jobstore/boltdb/store_test.go +++ b/pkg/jobstore/boltdb/store_test.go @@ -15,7 +15,6 @@ import ( "github.com/stretchr/testify/suite" "k8s.io/apimachinery/pkg/labels" - "github.com/bacalhau-project/bacalhau/pkg/bacerrors" "github.com/bacalhau-project/bacalhau/pkg/jobstore" "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/test/mock" @@ -202,7 +201,7 @@ func (s *BoltJobstoreTestSuite) TestUnfilteredJobHistory() { jobHistoryQueryResponse, err = s.store.GetJobHistory(s.ctx, "1", jobstore.JobHistoryQuery{}) s.Require().Error(err) - s.Require().IsType(err, &bacerrors.MultipleJobsFound{}) + s.Require().IsType(err, &models.BaseError{}) s.Require().Nil(jobHistoryQueryResponse) } @@ -543,7 +542,7 @@ func (s *BoltJobstoreTestSuite) TestGetExecutions() { JobID: "100", }) s.Require().Error(err) - s.Require().IsType(err, &bacerrors.JobNotFound{}) + s.Require().IsType(err, &models.BaseError{}) s.Require().Nil(state) state, err = s.store.GetExecutions(s.ctx, jobstore.GetExecutionsOptions{ @@ -557,7 +556,7 @@ func (s *BoltJobstoreTestSuite) TestGetExecutions() { JobID: "1", }) s.Require().Error(err) - s.Require().IsType(err, &bacerrors.MultipleJobsFound{}) + s.Require().IsType(err, &models.BaseError{}) s.Require().Nil(state) // Created At Ascending Order Sort @@ -684,7 +683,7 @@ func (s *BoltJobstoreTestSuite) TestShortIDs() { // No matches _, err := s.store.GetJob(s.ctx, shortString) s.Require().Error(err) - s.Require().IsType(err, &bacerrors.JobNotFound{}) + s.Require().IsType(err, &models.BaseError{}) // Create and fetch the single entry err = s.store.CreateJob(s.ctx, *job) @@ -701,7 +700,7 @@ func (s *BoltJobstoreTestSuite) TestShortIDs() { _, err = s.store.GetJob(s.ctx, shortString) s.Require().Error(err) - s.Require().IsType(err, &bacerrors.MultipleJobsFound{}) + s.Require().IsType(err, &models.BaseError{}) } func (s *BoltJobstoreTestSuite) TestEvents() { From c7a7dde40eabc25ec7dc49e1d2d7c075a166c566 Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 02:15:25 +0100 Subject: [PATCH 11/16] Fix failing tests --- pkg/jobstore/boltdb/store.go | 2 +- pkg/orchestrator/endpoint.go | 4 ++-- pkg/publicapi/client/v2/client.go | 29 +++++++++++++++++++---- pkg/publicapi/middleware/error_handler.go | 7 ++++++ 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/pkg/jobstore/boltdb/store.go b/pkg/jobstore/boltdb/store.go index 183c8ab2ba..7a9836a585 100644 --- a/pkg/jobstore/boltdb/store.go +++ b/pkg/jobstore/boltdb/store.go @@ -760,7 +760,7 @@ func (b *BoltJobStore) CreateJob(ctx context.Context, job models.Job) error { job.Normalize() err := job.Validate() if err != nil { - return err + return jobstore.NewJobStoreError(err.Error()) } return b.update(ctx, func(tx *bolt.Tx) (err error) { return b.createJob(tx, job) diff --git a/pkg/orchestrator/endpoint.go b/pkg/orchestrator/endpoint.go index 60e5f7d17a..1d9893d7ea 100644 --- a/pkg/orchestrator/endpoint.go +++ b/pkg/orchestrator/endpoint.go @@ -148,14 +148,14 @@ func (e *BaseEndpoint) StopJob(ctx context.Context, request *StopJobRequest) (St // no need to stop a job that is already stopped return StopJobResponse{}, nil case models.JobStateTypeCompleted: - return StopJobResponse{}, fmt.Errorf("cannot stop job in state %s", job.State.StateType) + return StopJobResponse{}, models.NewBaseError("cannot stop job in state %s", job.State.StateType) default: // continue } txContext, err := e.store.BeginTx(ctx) if err != nil { - return StopJobResponse{}, fmt.Errorf("failed to begin transaction: %w", err) + return StopJobResponse{}, jobstore.NewBoltDbError(err.Error()) } defer func() { diff --git a/pkg/publicapi/client/v2/client.go b/pkg/publicapi/client/v2/client.go index db68b835ad..6e0b9d377d 100644 --- a/pkg/publicapi/client/v2/client.go +++ b/pkg/publicapi/client/v2/client.go @@ -57,6 +57,9 @@ func (c *httpClient) Get(ctx context.Context, endpoint string, in apimodels.GetR r := in.ToHTTPRequest() _, resp, err := c.doRequest(ctx, http.MethodGet, endpoint, r) //nolint:bodyclose // this is being closed + if err != nil { + return err + } if resp.StatusCode == http.StatusUnauthorized { return apimodels.NewUnauthorizedError("invalid token") @@ -93,13 +96,28 @@ func (c *httpClient) write(ctx context.Context, verb, endpoint string, in apimod if r.BodyObj == nil && r.Body == nil { r.BodyObj = in } - _, resp, err := requireOK(c.doRequest(ctx, verb, endpoint, r)) //nolint:bodyclose // this is being closed - if err != nil && resp != nil && resp.StatusCode == http.StatusUnauthorized { - return apimodels.ErrInvalidToken - } else if err != nil { + + _, resp, err := c.doRequest(ctx, verb, endpoint, r) //nolint:bodyclose // this is being closed + defer resp.Body.Close() + if err != nil { return err } - defer resp.Body.Close() + + if resp.StatusCode == http.StatusUnauthorized { + return apimodels.ErrInvalidToken + } + + var apiError *apimodels.APIError + if resp.StatusCode != http.StatusOK { + apiError, err = apimodels.FromHttpResponse(resp) + if err != nil { + return err + } + } + + if apiError != nil { + return apiError + } if out != nil { if err := decodeBody(resp, &out); err != nil { @@ -107,6 +125,7 @@ func (c *httpClient) write(ctx context.Context, verb, endpoint string, in apimod } out.Normalize() } + return nil } diff --git a/pkg/publicapi/middleware/error_handler.go b/pkg/publicapi/middleware/error_handler.go index 7b454d8a3d..542462358a 100644 --- a/pkg/publicapi/middleware/error_handler.go +++ b/pkg/publicapi/middleware/error_handler.go @@ -21,11 +21,18 @@ func CustomHTTPErrorHandler(err error, c echo.Context) { code = e.Code().HTTPStatusCode() message = e.Error() + case *echo.HTTPError: + // This is needed, in case any other middleware throws an error. In + // such a scenario we just use it as the error code. + code = e.Code + message = e.Message.(string) + default: // In an ideal world this should never happen. We should always have are errors // from server as APIError. If output is this generic string, one should evaluate // and map it to APIError and send in appropriate message.= http.StatusInternalServerError message = "internal server error" + code = c.Response().Status } requestID := c.Request().Header.Get(echo.HeaderXRequestID) From 2df403189d6c508e00a3fbc5eae840a0ac454c79 Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 02:23:34 +0100 Subject: [PATCH 12/16] Fix failing tests --- pkg/publicapi/apimodels/error.go | 2 +- pkg/publicapi/middleware/error_handler.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go index 168e8dc17c..5a3262e21c 100644 --- a/pkg/publicapi/apimodels/error.go +++ b/pkg/publicapi/apimodels/error.go @@ -116,7 +116,7 @@ func FromHttpResponse(resp *http.Response) (*APIError, error) { body, err := io.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("error reading response body: %w") + return nil, fmt.Errorf("error reading response body: %w", err) } var apiErr APIError diff --git a/pkg/publicapi/middleware/error_handler.go b/pkg/publicapi/middleware/error_handler.go index 542462358a..3dbd2cbeca 100644 --- a/pkg/publicapi/middleware/error_handler.go +++ b/pkg/publicapi/middleware/error_handler.go @@ -23,7 +23,9 @@ func CustomHTTPErrorHandler(err error, c echo.Context) { case *echo.HTTPError: // This is needed, in case any other middleware throws an error. In - // such a scenario we just use it as the error code. + // such a scenario we just use it as the error code and the message. + // One such example being when request body size is larger then the max + // size accepted code = e.Code message = e.Message.(string) From a568d89034a9d7a479262675432202f63d47c9d0 Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 02:44:28 +0100 Subject: [PATCH 13/16] S3 errors should be from base error --- pkg/s3/error.go | 9 +++++++++ pkg/s3/types.go | 20 ++++++++++---------- 2 files changed, 19 insertions(+), 10 deletions(-) create mode 100644 pkg/s3/error.go diff --git a/pkg/s3/error.go b/pkg/s3/error.go new file mode 100644 index 0000000000..4d6c3140d5 --- /dev/null +++ b/pkg/s3/error.go @@ -0,0 +1,9 @@ +package s3 + +import "github.com/bacalhau-project/bacalhau/pkg/models" + +const S3_PUBLISHER_COMPONENT = "S3PUB" + +func NewErrBadS3Request(msg string) *models.BaseError { + return models.NewBaseError(msg).WithCode(models.NewErrorCode(S3_PUBLISHER_COMPONENT, 400)) +} diff --git a/pkg/s3/types.go b/pkg/s3/types.go index 8b472474c7..ee23430f8d 100644 --- a/pkg/s3/types.go +++ b/pkg/s3/types.go @@ -22,7 +22,7 @@ type SourceSpec struct { func (c SourceSpec) Validate() error { if c.Bucket == "" { - return errors.New("invalid s3 storage params: bucket cannot be empty") + return NewErrBadS3Request("invalid s3 storage params: bucket cannot be empty") } return nil } @@ -38,7 +38,7 @@ type PreSignedResultSpec struct { func (c PreSignedResultSpec) Validate() error { if c.PreSignedURL == "" { - return errors.New("invalid s3 signed storage params: signed url cannot be empty") + return NewErrBadS3Request("invalid s3 signed storage params: signed url cannot be empty") } return c.SourceSpec.Validate() } @@ -49,11 +49,11 @@ func (c PreSignedResultSpec) ToMap() map[string]interface{} { func DecodeSourceSpec(spec *models.SpecConfig) (SourceSpec, error) { if !spec.IsType(models.StorageSourceS3) { - return SourceSpec{}, errors.New("invalid storage source type. expected " + models.StorageSourceS3 + ", but received: " + spec.Type) + return SourceSpec{}, NewErrBadS3Request("invalid storage source type. expected " + models.StorageSourceS3 + ", but received: " + spec.Type) } inputParams := spec.Params if inputParams == nil { - return SourceSpec{}, errors.New("invalid storage source params. cannot be nil") + return SourceSpec{}, NewErrBadS3Request("invalid storage source params. cannot be nil") } var c SourceSpec @@ -72,7 +72,7 @@ func DecodePreSignedResultSpec(spec *models.SpecConfig) (PreSignedResultSpec, er inputParams := spec.Params if inputParams == nil { - return PreSignedResultSpec{}, errors.New("invalid signed result params. cannot be nil") + return PreSignedResultSpec{}, NewErrBadS3Request("invalid signed result params. cannot be nil") } var c PreSignedResultSpec @@ -92,10 +92,10 @@ type PublisherSpec struct { func (c PublisherSpec) Validate() error { if c.Bucket == "" { - return fmt.Errorf("invalid s3 params. bucket cannot be empty") + return NewErrBadS3Request("invalid s3 params. bucket cannot be empty") } if c.Key == "" { - return fmt.Errorf("invalid s3 params. key cannot be empty") + return NewErrBadS3Request("invalid s3 params. key cannot be empty") } return nil } @@ -106,12 +106,12 @@ func (c PublisherSpec) ToMap() map[string]interface{} { func DecodePublisherSpec(spec *models.SpecConfig) (PublisherSpec, error) { if !spec.IsType(models.PublisherS3) { - return PublisherSpec{}, fmt.Errorf("invalid publisher type. expected %s, but received: %s", + return PublisherSpec{}, NewErrBadS3Request("invalid publisher type. expected %s, but received: %s", models.PublisherS3, spec.Type) } inputParams := spec.Params if inputParams == nil { - return PublisherSpec{}, fmt.Errorf("invalid publisher params. cannot be nil") + return PublisherSpec{}, NewErrBadS3Request("invalid publisher params. cannot be nil") } var c PublisherSpec @@ -147,7 +147,7 @@ func NewPublisherSpec(bucket string, key string, opts ...PublisherOption) (*mode } if err := spec.Validate(); err != nil { - return nil, fmt.Errorf("failed to build %s publisher spec: %w", models.PublisherS3, err) + return nil, err } return &models.SpecConfig{ From 7ec8f2ce1fd3fb7134f703c0f224a0ab73bbee60 Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 02:50:20 +0100 Subject: [PATCH 14/16] remove unnecessary log --- pkg/publicapi/endpoint/orchestrator/job.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/publicapi/endpoint/orchestrator/job.go b/pkg/publicapi/endpoint/orchestrator/job.go index 77463edd44..ba362f858a 100644 --- a/pkg/publicapi/endpoint/orchestrator/job.go +++ b/pkg/publicapi/endpoint/orchestrator/job.go @@ -78,7 +78,6 @@ func (e *Endpoint) getJob(c echo.Context) error { //nolint: gocyclo } job, err := e.store.GetJob(ctx, jobID) if err != nil { - log.Error().Err(err) return err } response := apimodels.GetJobResponse{ From 33f54907abd0b59dfb96e1e3a44f7f7462e4a14d Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 03:01:08 +0100 Subject: [PATCH 15/16] fix build --- pkg/s3/types.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/s3/types.go b/pkg/s3/types.go index ee23430f8d..ced8f8b423 100644 --- a/pkg/s3/types.go +++ b/pkg/s3/types.go @@ -106,8 +106,8 @@ func (c PublisherSpec) ToMap() map[string]interface{} { func DecodePublisherSpec(spec *models.SpecConfig) (PublisherSpec, error) { if !spec.IsType(models.PublisherS3) { - return PublisherSpec{}, NewErrBadS3Request("invalid publisher type. expected %s, but received: %s", - models.PublisherS3, spec.Type) + return PublisherSpec{}, NewErrBadS3Request(fmt.Sprintf("invalid publisher type. expected %s, but received: %s", + models.PublisherS3, spec.Type)) } inputParams := spec.Params if inputParams == nil { From 09f5421bd7320509d536cc7d832f55f8b71d6538 Mon Sep 17 00:00:00 2001 From: udsamani Date: Wed, 11 Sep 2024 03:07:28 +0100 Subject: [PATCH 16/16] s3 storage errors fix --- pkg/storage/error.go | 9 +++++++++ pkg/storage/s3/types.go | 18 ++++++++++-------- 2 files changed, 19 insertions(+), 8 deletions(-) create mode 100644 pkg/storage/error.go diff --git a/pkg/storage/error.go b/pkg/storage/error.go new file mode 100644 index 0000000000..b0cf0227bd --- /dev/null +++ b/pkg/storage/error.go @@ -0,0 +1,9 @@ +package storage + +import "github.com/bacalhau-project/bacalhau/pkg/models" + +const S3_STORAGE_COMPONENT = "S3STOR" + +func NewErrBadS3StorageRequest(msg string) *models.BaseError { + return models.NewBaseError(msg).WithCode(models.NewErrorCode(S3_STORAGE_COMPONENT, 400)) +} diff --git a/pkg/storage/s3/types.go b/pkg/storage/s3/types.go index e09592448f..bd9311bcda 100644 --- a/pkg/storage/s3/types.go +++ b/pkg/storage/s3/types.go @@ -1,12 +1,13 @@ package s3 import ( - "errors" + "fmt" "github.com/fatih/structs" "github.com/mitchellh/mapstructure" "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/storage" ) type SourceSpec struct { @@ -21,7 +22,7 @@ type SourceSpec struct { func (c SourceSpec) Validate() error { if c.Bucket == "" { - return errors.New("invalid s3 storage params: bucket cannot be empty") + return storage.NewErrBadS3StorageRequest("invalid s3 storage params: bucket cannot be empty") } return nil } @@ -32,11 +33,11 @@ func (c SourceSpec) ToMap() map[string]interface{} { func DecodeSourceSpec(spec *models.SpecConfig) (SourceSpec, error) { if !spec.IsType(models.StorageSourceS3) { - return SourceSpec{}, errors.New("invalid storage source type. expected " + models.StorageSourceS3 + ", but received: " + spec.Type) + return SourceSpec{}, storage.NewErrBadS3StorageRequest("invalid storage source type. expected " + models.StorageSourceS3 + ", but received: " + spec.Type) } inputParams := spec.Params if inputParams == nil { - return SourceSpec{}, errors.New("invalid storage source params. cannot be nil") + return SourceSpec{}, storage.NewErrBadS3StorageRequest("invalid storage source params. cannot be nil") } var c SourceSpec @@ -54,7 +55,7 @@ type PreSignedResultSpec struct { func (c PreSignedResultSpec) Validate() error { if c.PreSignedURL == "" { - return errors.New("invalid s3 signed storage params: signed url cannot be empty") + return storage.NewErrBadS3StorageRequest("invalid s3 signed storage params: signed url cannot be empty") } return c.SourceSpec.Validate() } @@ -65,13 +66,14 @@ func (c PreSignedResultSpec) ToMap() map[string]interface{} { func DecodePreSignedResultSpec(spec *models.SpecConfig) (PreSignedResultSpec, error) { if !spec.IsType(models.StorageSourceS3PreSigned) { - return PreSignedResultSpec{}, errors.New( - "invalid storage source type. expected " + models.StorageSourceS3PreSigned + ", but received: " + spec.Type) + return PreSignedResultSpec{}, storage.NewErrBadS3StorageRequest( + fmt.Sprintf("invalid storage source type. expected %s, but received: %s", + models.StorageSourceS3PreSigned, spec.Type)) } inputParams := spec.Params if inputParams == nil { - return PreSignedResultSpec{}, errors.New("invalid signed result params. cannot be nil") + return PreSignedResultSpec{}, storage.NewErrBadS3StorageRequest("invalid signed result params. cannot be nil") } var c PreSignedResultSpec