diff --git a/.cspell-code.json b/.cspell-code.json deleted file mode 100644 index 2e846c2e82..0000000000 --- a/.cspell-code.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - "version": "0.2", - "language": "en", - "allowCompoundWords": true, - "dictionaryDefinitions": [ - { - "name": "custom-dictionary", - "path": "./.cspell/custom-dictionary.txt", - "addWords": true - } - ], - "dictionaries": [ - "en", - "custom-words", - "custom-dictionary" - ], - "ignorePaths": [ - "**/package.json", - "**/docs/package-lock.json", - "**/docs/docs/examples/model-training/Stable-Diffusion-Dreambooth/index.md", - "docs/docs/examples/model-training/Training-Tensorflow-Model/index.md", - "./webui/build", - "./webui/node_modules", - "./webui/package.json", - "./webui/package-lock.json", - "./.gitprecommit", - "./webui/tsconfig.json", - "./vendor", - "go.sum", - "go.mod", - "go.work.sum", - "apps" - ], - "ignoreRegExpList": [ - "Urls", - "Email", - "RsaCert", - "SshRsa", - "Base64MultiLine", - "Base64SingleLine", - "CommitHash", - "CommitHashLink", - "CStyleHexValue", - "CSSHexValue", - "SHA", - "HashStrings", - "UnicodeRef", - "UUID", - "/github.com.*/", - "/\\w+{12,}/" - ] -} diff --git a/.cspell/custom-dictionary.txt b/.cspell/custom-dictionary.txt index 4866751ba1..bb8b2c1c1b 100644 --- a/.cspell/custom-dictionary.txt +++ b/.cspell/custom-dictionary.txt @@ -355,6 +355,7 @@ wasmlogs wasmmodels wazero wdbaruni's +simonwo webui wesbos winderresearch @@ -371,3 +372,63 @@ yyyymmddhhmm zarr zerolog zidane +IMDC +kvstore +unmarshalling +Nowf +pkey +machineid +bacerror +Nacked +pqueue +Routez +Connz +Subsz +nuid +Noticef +Warnf +Debugf +Tracef +sresource +Syncer +mathgo +providables +JSONV +Idxs +boltdblib +hclog +THAMTShard +mergo +serde +qdisc +puuid +pkgs +pscbin +rocm +strg +otlploggrpc +yacspin +APITLSCA +APITLSCA +Milli +Errf +doesn +cicd +nvme +fdisk +mdstat +xcom +Fooco +Eventuallyf +Truef +sekret +Equalf +Doesnt +HAMT +dagpb +Berdly +frrist +swaggo +isbadactor +installationid +firstbacalhauimage \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml index 8a7a7ba501..641b6e1d6e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -54,12 +54,6 @@ linters-settings: - shadow lll: line-length: 140 - misspell: - locale: US - ignore-words: - - favour - - cancelled - - cancelling nolintlint: allow-leading-space: true # don't require machine-readable nolint directives (i.e. with no leading space) allow-unused: true # report any unused nolint directives @@ -89,7 +83,6 @@ linters: - govet - ineffassign - lll - - misspell - mnd - nakedret - noctx diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ee7d6b8d4a..ee46828daa 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: - id: detect-aws-credentials args: [--allow-missing-credentials] - id: detect-private-key - exclude: testdata/.* + exclude: 'testdata/.*|test-integration/certificates/.*' - id: check-yaml - id: check-json - repo: https://github.com/astral-sh/ruff-pre-commit diff --git a/DESIGN.md b/DESIGN.md index 62d45785aa..c322baa3ee 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -103,16 +103,11 @@ Ideally, we will also allow much more fine-grained control, specifying location, - She has a file `process.py` which includes the python code necessary to execute in a function called 'downscale()' which takes a file handle to local, processes it, and returns a bytestream. - She executes the following command: ``` -ifps job submit -f process.py -r requirements.txt -c QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR +ipfs job submit -f process.py -r requirements.txt -c QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR ``` - This runs the command in a local executor, first installing all the python packages necessary, and then executing them, on the subset of data available on that node. - Once complete, the system returns the CID of the updated dataset that she can download. -- **SCENARIO 3** Want to burst to cloud but cannot move entire dataset in short time - - DHASH CAN YOU HELP FLESH OUT - - **PUSH COMPUTE INTO GENE SEQUENCER** - - **PIPE TO S3** - ## Components to Build - Build an application that listens for jobs over NATS, receives payment somehow, runs the job in {kubernetes, docker, idk}, and returns the result to the use (ideally the 'result' is in the form of an ipfs object and we can just return the hash). @@ -125,13 +120,3 @@ ifps job submit -f process.py -r requirements.txt -c QmbWqxBEKC3P8tqsKc98xmWNzrz Bacalhau means cod (the fish) in Portuguese (where several folks were brainstorming this topic). Compute-Over-Data == Cod == Bacalhau - -## Prior Art / Parallel Projects -* IPFS-FAN - distributed serverless - https://research.protocol.ai/publications/ipfs-fan-a-function-addressable-computation-network/delarocha2021a.pdf -* IPLS : A Framework for Decentralized Federated Learning- https://arxiv.org/pdf/2101.01901v1.pdf -* Interplanetary Distributed Computing (2018) - https://github.com/yenkuanlee/IPDC -* IPTF - IPFS + TensorFlow (2018) - https://github.com/tesserai/iptf -* Lurk -> Run queries over Filecoin Sealed Data (no public paper yet) -* Radix - Nomad based scheduler for IPFS cluster (only) - high level spec doc https://docs.google.com/document/d/18hdYBmDlvusEOQ-iSNIO_IAEOvJVFL1MyAU_B8hON9Q/edit?usp=sharing -* Bringing Arbitrary Compute to Authoritative Data https://queue.acm.org/detail.cfm?id=2645649 -* Manta: a scalable, distributed object store https://github.com/joyent/manta diff --git a/Makefile b/Makefile index 802fbfbb2e..54da79acc6 100644 --- a/Makefile +++ b/Makefile @@ -469,5 +469,5 @@ release: build-bacalhau cp bin/bacalhau . .PHONY: spellcheck-code -spellcheck-code: ## Runs a spellchecker over all code - MVP just does one file - cspell -c .cspell-code.json lint ./pkg/authn/** +spellcheck-code: + cspell lint -c cspell.yaml --quiet "**/*.{go,js,ts,jsx,tsx,md,yml,yaml,json}" diff --git a/clients/python/docs/OrchestratorApi.md b/clients/python/docs/OrchestratorApi.md index e8f4b2c85b..8303d5e504 100644 --- a/clients/python/docs/OrchestratorApi.md +++ b/clients/python/docs/OrchestratorApi.md @@ -35,7 +35,7 @@ from pprint import pprint api_instance = bacalhau_apiclient.OrchestratorApi() id = 'id_example' # str | ID to get the job for include = 'include_example' # str | Takes history and executions as options. If empty will not include anything else. (optional) -limit = 56 # int | Number of history or exeuctions to fetch. Should be used in conjugation with include (optional) +limit = 56 # int | Number of history or executions to fetch. Should be used in conjugation with include (optional) try: # Returns a job. @@ -47,11 +47,11 @@ except ApiException as e: ### Parameters -Name | Type | Description | Notes -------------- | ------------- | ------------- | ------------- - **id** | **str**| ID to get the job for | - **include** | **str**| Takes history and executions as options. If empty will not include anything else. | [optional] - **limit** | **int**| Number of history or exeuctions to fetch. Should be used in conjugation with include | [optional] +Name | Type | Description | Notes +------------- | ------------- |--------------------------------------------------------------------------------------| ------------- + **id** | **str**| ID to get the job for | + **include** | **str**| Takes history and executions as options. If empty will not include anything else. | [optional] + **limit** | **int**| Number of history or executions to fetch. Should be used in conjugation with include | [optional] ### Return type diff --git a/cmd/cli/agent/alive_test.go b/cmd/cli/agent/alive_test.go index b25b07be67..469c6944d1 100644 --- a/cmd/cli/agent/alive_test.go +++ b/cmd/cli/agent/alive_test.go @@ -30,7 +30,7 @@ func (s *AliveSuite) TestAliveJSONOutput() { aliveInfo := &apimodels.IsAliveResponse{} err = marshaller.JSONUnmarshalWithMax([]byte(out), &aliveInfo) - s.Require().NoError(err, "Could not unmarshall the output into json - %+v", err) + s.Require().NoError(err, "Could not unmarshal the output into json - %+v", err) s.Require().True(aliveInfo.IsReady()) } @@ -40,6 +40,6 @@ func (s *AliveSuite) TestAliveYAMLOutput() { aliveInfo := &apimodels.IsAliveResponse{} err = marshaller.YAMLUnmarshalWithMax([]byte(out), &aliveInfo) - s.Require().NoError(err, "Could not unmarshall the output into yaml - %+v", out) + s.Require().NoError(err, "Could not unmarshal the output into yaml - %+v", out) s.Require().True(aliveInfo.IsReady()) } diff --git a/cmd/cli/agent/node_test.go b/cmd/cli/agent/node_test.go index 8fe7096a2b..5121e9bb65 100644 --- a/cmd/cli/agent/node_test.go +++ b/cmd/cli/agent/node_test.go @@ -28,7 +28,7 @@ func (s *NodeSuite) TestNodeJSONOutput() { nodeInfo := &models.NodeState{} err = marshaller.JSONUnmarshalWithMax([]byte(out), &nodeInfo) - s.Require().NoError(err, "Could not unmarshall the output into json - %+v", out) + s.Require().NoError(err, "Could not unmarshal the output into json - %+v", out) s.Require().Equal(s.Node.ID, nodeInfo.Info.ID(), "Node ID does not match in json.") } @@ -38,6 +38,6 @@ func (s *NodeSuite) TestNodeYAMLOutput() { nodeInfo := &models.NodeState{} err = marshaller.YAMLUnmarshalWithMax([]byte(out), &nodeInfo) - s.Require().NoError(err, "Could not unmarshall the output into yaml - %+v", out) + s.Require().NoError(err, "Could not unmarshal the output into yaml - %+v", out) s.Require().Equal(s.Node.ID, nodeInfo.Info.ID(), "Node ID does not match in yaml.") } diff --git a/cmd/cli/agent/version_test.go b/cmd/cli/agent/version_test.go index e7fe5421ef..8083ebfb36 100644 --- a/cmd/cli/agent/version_test.go +++ b/cmd/cli/agent/version_test.go @@ -45,7 +45,7 @@ func (s *VersionSuite) TestVersionJSONOutput() { expectedVersion := version.Get() printedVersion := &models.BuildVersionInfo{} err = marshaller.JSONUnmarshalWithMax([]byte(out), &printedVersion) - s.Require().NoError(err, "Could not unmarshall the output into json - %+v", out) + s.Require().NoError(err, "Could not unmarshal the output into json - %+v", out) s.Require().Equal(expectedVersion, printedVersion, "Versions do not match in json.") } @@ -58,6 +58,6 @@ func (s *VersionSuite) TestVersionYAMLOutput() { expectedVersion := version.Get() printedVersion := &models.BuildVersionInfo{} err = marshaller.YAMLUnmarshalWithMax([]byte(out), &printedVersion) - s.Require().NoError(err, "Could not unmarshall the output into yaml - %+v", out) + s.Require().NoError(err, "Could not unmarshal the output into yaml - %+v", out) s.Require().Equal(expectedVersion, printedVersion, "Versions do not match in yaml.") } diff --git a/cmd/cli/docker/docker_run_cli_test.go b/cmd/cli/docker/docker_run_cli_test.go index 9f5f8d2539..42ecf4dd0d 100644 --- a/cmd/cli/docker/docker_run_cli_test.go +++ b/cmd/cli/docker/docker_run_cli_test.go @@ -1,5 +1,7 @@ //go:build unit || !integration +/* spell-checker: disable */ + package docker import ( @@ -431,7 +433,7 @@ func TestJobFlagParsing(t *testing.T) { }, expectedError: false, }, - // TODO(forrest): if/when validtion on the network config is adjusted expect this test to fail. + // TODO(forrest): if/when validation on the network config is adjusted expect this test to fail. { name: "with none network and domains", flags: []string{"--network=none", "--domain=example.com", "--domain=example.io", "image:tag"}, @@ -487,30 +489,30 @@ func TestJobFlagParsing(t *testing.T) { }, { name: "with s3 publisher", - flags: []string{"--publisher=s3://mybucket/mykey", "image:tag"}, + flags: []string{"--publisher=s3://myBucket/myKey", "image:tag"}, assertJob: func(t *testing.T, j *models.Job) { defaultJobAssertions(t, j) task := j.Task() s3publisher, err := publisher_s3.DecodePublisherSpec(task.Publisher) require.NoError(t, err) assert.Equal(t, publisher_s3.PublisherSpec{ - Bucket: "mybucket", - Key: "mykey", + Bucket: "myBucket", + Key: "myKey", }, s3publisher) }, expectedError: false, }, { name: "with s3 publisher with opts", - flags: []string{"-p=s3://mybucket/mykey,opt=region=us-west-2,opt=endpoint=https://s3.custom.com", "image:tag"}, + flags: []string{"-p=s3://myBucket/myKey,opt=region=us-west-2,opt=endpoint=https://s3.custom.com", "image:tag"}, assertJob: func(t *testing.T, j *models.Job) { defaultJobAssertions(t, j) task := j.Task() s3publisher, err := publisher_s3.DecodePublisherSpec(task.Publisher) require.NoError(t, err) assert.Equal(t, publisher_s3.PublisherSpec{ - Bucket: "mybucket", - Key: "mykey", + Bucket: "myBucket", + Key: "myKey", Region: "us-west-2", Endpoint: "https://s3.custom.com", }, s3publisher) @@ -519,15 +521,15 @@ func TestJobFlagParsing(t *testing.T) { }, { name: "with s3 publisher with options", - flags: []string{"-p=s3://mybucket/mykey,option=region=us-west-2,option=endpoint=https://s3.custom.com", "image:tag"}, + flags: []string{"-p=s3://myBucket/myKey,option=region=us-west-2,option=endpoint=https://s3.custom.com", "image:tag"}, assertJob: func(t *testing.T, j *models.Job) { defaultJobAssertions(t, j) task := j.Task() s3publisher, err := publisher_s3.DecodePublisherSpec(task.Publisher) require.NoError(t, err) assert.Equal(t, publisher_s3.PublisherSpec{ - Bucket: "mybucket", - Key: "mykey", + Bucket: "myBucket", + Key: "myKey", Region: "us-west-2", Endpoint: "https://s3.custom.com", }, s3publisher) diff --git a/cmd/cli/docker/docker_run_test.go b/cmd/cli/docker/docker_run_test.go index dd0e603f7a..13c3ece935 100644 --- a/cmd/cli/docker/docker_run_test.go +++ b/cmd/cli/docker/docker_run_test.go @@ -165,12 +165,12 @@ func (s *DockerRunSuite) TestRun_SubmitUrlInputs() { {inputURL: InputURL{url: "https://raw.githubusercontent.com/bacalhau-project/bacalhau/main/main.go", pathInContainer: "/inputs", filename: "main.go", flag: "-i"}}, } - for _, turls := range testURLs { + for _, urls := range testURLs { ctx := context.Background() flagsArray := []string{"docker", "run"} - flagsArray = append(flagsArray, turls.inputURL.flag, turls.inputURL.url) - flagsArray = append(flagsArray, "ubuntu", "cat", fmt.Sprintf("%s/%s", turls.inputURL.pathInContainer, turls.inputURL.filename)) + flagsArray = append(flagsArray, urls.inputURL.flag, urls.inputURL.url) + flagsArray = append(flagsArray, "ubuntu", "cat", fmt.Sprintf("%s/%s", urls.inputURL.pathInContainer, urls.inputURL.filename)) _, out, err := s.ExecuteTestCobraCommand(flagsArray...) s.Require().NoError(err, "Error submitting job") @@ -180,8 +180,8 @@ func (s *DockerRunSuite) TestRun_SubmitUrlInputs() { s.Require().Equal(1, len(j.Task().InputSources), "Number of job urls != # of test urls.") urlSpec, err := storage_url.DecodeSpec(j.Task().InputSources[0].Source) s.Require().NoError(err) - s.Require().Equal(turls.inputURL.url, urlSpec.URL, "Test URL not equal to URL from job.") - s.Require().Equal(turls.inputURL.pathInContainer, j.Task().InputSources[0].Target, "Test Path not equal to Path from job.") + s.Require().Equal(urls.inputURL.url, urlSpec.URL, "Test URL not equal to URL from job.") + s.Require().Equal(urls.inputURL.pathInContainer, j.Task().InputSources[0].Target, "Test Path not equal to Path from job.") } } @@ -252,8 +252,8 @@ func (s *DockerRunSuite) TestRun_SubmitWorkdir() { }{ {workdir: "", errorCode: 0}, {workdir: "/", errorCode: 0}, - {workdir: "./mydir", errorCode: 1}, - {workdir: "../mydir", errorCode: 1}, + {workdir: "./myDir", errorCode: 1}, + {workdir: "../myDir", errorCode: 1}, {workdir: "http://foo.com", errorCode: 1}, {workdir: "/foo//", errorCode: 0}, // double forward slash is allowed in unix {workdir: "/foo//bar", errorCode: 0}, diff --git a/cmd/cli/exec/exec.go b/cmd/cli/exec/exec.go index 7c6af0ea1a..3996edd794 100644 --- a/cmd/cli/exec/exec.go +++ b/cmd/cli/exec/exec.go @@ -228,7 +228,7 @@ func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, opti job.Task().Env = options.TaskSettings.EnvironmentVariables job.Task().InputSources = options.TaskSettings.InputSources.Values() if err != nil { - return nil, fmt.Errorf("parsing job labes: %w", err) + return nil, fmt.Errorf("parsing job labels: %w", err) } job.Constraints, err = options.JobSettings.Constraints() if err != nil { diff --git a/cmd/cli/exec/exec_test.go b/cmd/cli/exec/exec_test.go index 608d3f05a5..076811f186 100644 --- a/cmd/cli/exec/exec_test.go +++ b/cmd/cli/exec/exec_test.go @@ -37,7 +37,7 @@ var testcases []testCase = []testCase{ { // bacalhau exec ruby -e "puts 'hello'" name: "no ruby here", - cmdLine: []string{"ruby", "-e", "\"puts 'helllo'\""}, + cmdLine: []string{"ruby", "-e", "\"puts 'hello'\""}, expectedUnknownArgs: []string{}, expectedErrMsg: "the job type 'ruby' is not supported", }, diff --git a/cmd/cli/exec/templates.go b/cmd/cli/exec/templates.go index fb2f286acd..e65fee7898 100644 --- a/cmd/cli/exec/templates.go +++ b/cmd/cli/exec/templates.go @@ -24,8 +24,8 @@ type TemplateMap struct { m map[string]string } -func NewTemplateMap(fsys fs.ReadDirFS, tplPath string) (*TemplateMap, error) { - entries, err := fsys.ReadDir(tplPath) +func NewTemplateMap(fSys fs.ReadDirFS, tplPath string) (*TemplateMap, error) { + entries, err := fSys.ReadDir(tplPath) if err != nil { return nil, err } @@ -41,7 +41,7 @@ func NewTemplateMap(fsys fs.ReadDirFS, tplPath string) (*TemplateMap, error) { name := nameFromFile(entry.Name()) - fd, err := fsys.Open(path.Join(tplPath, entry.Name())) + fd, err := fSys.Open(path.Join(tplPath, entry.Name())) if err != nil { return nil, err } diff --git a/cmd/cli/helpers/helpers.go b/cmd/cli/helpers/helpers.go index 1f639a074c..487d2b3d9e 100644 --- a/cmd/cli/helpers/helpers.go +++ b/cmd/cli/helpers/helpers.go @@ -62,7 +62,7 @@ func BuildJobFromFlags( labels, err := jobSettings.Labels() if err != nil { - return nil, fmt.Errorf("receieved invalid job labels: %w", err) + return nil, fmt.Errorf("received invalid job labels: %w", err) } job := &models.Job{ Name: jobSettings.Name(), diff --git a/cmd/cli/serve/serve.go b/cmd/cli/serve/serve.go index 8886b58ae4..e2bdb8a2f3 100644 --- a/cmd/cli/serve/serve.go +++ b/cmd/cli/serve/serve.go @@ -277,12 +277,12 @@ func parseServerAPIHost(host string) (string, error) { // We should check that the value gives us an address type // we can use to get our IP address. If it doesn't, we should // panic. - atype, ok := network.AddressTypeFromString(host) + addrType, ok := network.AddressTypeFromString(host) if !ok { return "", fmt.Errorf("invalid address type in Server API Host config: %s", host) } - addr, err := network.GetNetworkAddress(atype, network.AllAddresses) + addr, err := network.GetNetworkAddress(addrType, network.AllAddresses) if err != nil { return "", fmt.Errorf("failed to get network address for Server API Host: %s: %w", host, err) } diff --git a/cmd/cli/version/version.go b/cmd/cli/version/version.go index 9e53db306e..a2ce1e1dc0 100644 --- a/cmd/cli/version/version.go +++ b/cmd/cli/version/version.go @@ -128,10 +128,10 @@ func (oV *VersionOptions) Run( } else { // NB(forrest): since `GetAllVersions` is an API call - in the event the server is un-reachable // we timeout after 3 seconds to avoid waiting on an unavailable server to return its version information. - vctx, cancel := context.WithTimeout(ctx, time.Second*3) + vCtx, cancel := context.WithTimeout(ctx, time.Second*3) defer cancel() var err error - versions, err = util.GetAllVersions(vctx, cfg, api, r) + versions, err = util.GetAllVersions(vCtx, cfg, api, r) if err != nil { // No error on fail of version check. Just print as much as we can. log.Ctx(ctx).Warn().Err(err).Msg("failed to get updated versions") diff --git a/cmd/cli/version/version_test.go b/cmd/cli/version/version_test.go index a3d127e593..afbb2078c7 100644 --- a/cmd/cli/version/version_test.go +++ b/cmd/cli/version/version_test.go @@ -18,11 +18,12 @@ package version_test import ( "testing" - "github.com/bacalhau-project/bacalhau/cmd/util" - "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/bacalhau-project/bacalhau/cmd/util" + "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" + cmdtesting "github.com/bacalhau-project/bacalhau/cmd/testing" "github.com/bacalhau-project/bacalhau/cmd/util/output" ) @@ -51,7 +52,7 @@ func (suite *VersionSuite) TestVersionJSONOutput() { jsonDoc := &util.Versions{} err = marshaller.JSONUnmarshalWithMax([]byte(out), &jsonDoc) - require.NoError(suite.T(), err, "Could not unmarshall the output into json - %+v", err) + require.NoError(suite.T(), err, "Could not unmarshal the output into json - %+v", err) require.Equal(suite.T(), jsonDoc.ClientVersion.GitCommit, jsonDoc.ServerVersion.GitCommit, "Client and Server do not match in json.") } @@ -61,7 +62,7 @@ func (suite *VersionSuite) TestVersionYAMLOutput() { yamlDoc := &util.Versions{} err = marshaller.YAMLUnmarshalWithMax([]byte(out), &yamlDoc) - require.NoError(suite.T(), err, "Could not unmarshall the output into yaml - %+v", err) + require.NoError(suite.T(), err, "Could not unmarshal the output into yaml - %+v", err) require.Equal(suite.T(), yamlDoc.ClientVersion.GitCommit, yamlDoc.ServerVersion.GitCommit, "Client and Server do not match in yaml.") } diff --git a/cmd/cli/wasm/wasm_run.go b/cmd/cli/wasm/wasm_run.go index f3d93ca422..e01e0697d0 100644 --- a/cmd/cli/wasm/wasm_run.go +++ b/cmd/cli/wasm/wasm_run.go @@ -190,7 +190,7 @@ func build(ctx context.Context, args []string, opts *WasmRunOptions) (*models.Jo if err != nil { return nil, err } - envar, err := parse.StringSliceToMap(opts.EnvironmentVariables) + envVar, err := parse.StringSliceToMap(opts.EnvironmentVariables) if err != nil { return nil, fmt.Errorf("wasm env vars invalid: %w", err) } @@ -198,7 +198,7 @@ func build(ctx context.Context, args []string, opts *WasmRunOptions) (*models.Jo WithParameters(args[1:]...). WithEntrypoint(opts.Entrypoint). WithImportModules(opts.ImportModules). - WithEnvironmentVariables(envar). + WithEnvironmentVariables(envVar). Build() if err != nil { return nil, err diff --git a/cmd/testing/base.go b/cmd/testing/base.go index 304e1a005d..655834547a 100644 --- a/cmd/testing/base.go +++ b/cmd/testing/base.go @@ -112,7 +112,7 @@ func (s *BaseSuite) ExecuteTestCobraCommandWithStdin(stdin io.Reader, args ...st buf := new(bytes.Buffer) root := cli.NewRootCmd() root.SetOut(buf) - // TODO(forrest): we should separate the ouputs from a command into different buffers for stderr and sdtout, otherwise + // TODO(forrest): we should separate the outputs from a command into different buffers for stderr and stdout, otherwise // log lines and other outputs (like the update checker) will be included in the returned buffer, and commands // that make assertions on the output containing specific values, or being marshaller-able to yaml will fail. root.SetErr(buf) diff --git a/cmd/util/flags/cliflags/job.go b/cmd/util/flags/cliflags/job.go index f9f4a55b23..b9e89aed8a 100644 --- a/cmd/util/flags/cliflags/job.go +++ b/cmd/util/flags/cliflags/job.go @@ -71,7 +71,7 @@ func (j *JobSettings) Constraints() ([]*models.LabelSelectorRequirement, error) } // TODO(forrest): based on a conversation with walid we should be returning an error here if at anypoint if a label -// if provided that is invalid. We cannont remove them as we did previously. +// if provided that is invalid. We cannot remove them as we did previously. func (j *JobSettings) Labels() (map[string]string, error) { parsedLabels := make(map[string]string) rawLabels := j.labels diff --git a/cmd/util/flags/configflags/register.go b/cmd/util/flags/configflags/register.go index b46a8d1fb8..550b2baefa 100644 --- a/cmd/util/flags/configflags/register.go +++ b/cmd/util/flags/configflags/register.go @@ -46,7 +46,7 @@ func BindFlags(v *viper.Viper, register map[string][]Definition) error { for _, def := range defs { // sanity check to ensure we are not binding a config key on more than one flag. if dup, ok := seen[def.ConfigPath]; ok && !def.Deprecated { - return fmt.Errorf("DEVELOPER ERROR: duplicate regsistration of config key %s for flag %s"+ + return fmt.Errorf("DEVELOPER ERROR: duplicate registration of config key %s for flag %s"+ " previously registered on on flag %s", def.ConfigPath, def.FlagName, dup.FlagName) } if !def.Deprecated { @@ -79,43 +79,43 @@ func PreRun(v *viper.Viper, flags map[string][]Definition) func(*cobra.Command, // This method should be called before the command runs to register flags accordingly. func RegisterFlags(cmd *cobra.Command, register map[string][]Definition) error { for name, defs := range register { - fset := pflag.NewFlagSet(name, pflag.ContinueOnError) + flagSet := pflag.NewFlagSet(name, pflag.ContinueOnError) // Determine the type of the default value for _, def := range defs { switch v := def.DefaultValue.(type) { case int: - fset.Int(def.FlagName, v, def.Description) + flagSet.Int(def.FlagName, v, def.Description) case uint64: - fset.Uint64(def.FlagName, v, def.Description) + flagSet.Uint64(def.FlagName, v, def.Description) case bool: - fset.Bool(def.FlagName, v, def.Description) + flagSet.Bool(def.FlagName, v, def.Description) case string: - fset.String(def.FlagName, v, def.Description) + flagSet.String(def.FlagName, v, def.Description) case []string: - fset.StringSlice(def.FlagName, v, def.Description) + flagSet.StringSlice(def.FlagName, v, def.Description) case map[string]string: - fset.StringToString(def.FlagName, v, def.Description) + flagSet.StringToString(def.FlagName, v, def.Description) case models.JobSelectionDataLocality: - fset.Var(flags.DataLocalityFlag(&v), def.FlagName, def.Description) + flagSet.Var(flags.DataLocalityFlag(&v), def.FlagName, def.Description) case logger.LogMode: - fset.Var(flags.LoggingFlag(&v), def.FlagName, def.Description) + flagSet.Var(flags.LoggingFlag(&v), def.FlagName, def.Description) case time.Duration: - fset.DurationVar(&v, def.FlagName, v, def.Description) + flagSet.DurationVar(&v, def.FlagName, v, def.Description) case types.Duration: - fset.DurationVar((*time.Duration)(&v), def.FlagName, time.Duration(v), def.Description) + flagSet.DurationVar((*time.Duration)(&v), def.FlagName, time.Duration(v), def.Description) case types.ResourceType: - fset.String(def.FlagName, string(v), def.Description) + flagSet.String(def.FlagName, string(v), def.Description) default: return fmt.Errorf("unhandled type: %T for flag %s", v, def.FlagName) } if def.Deprecated { - flag := fset.Lookup(def.FlagName) + flag := flagSet.Lookup(def.FlagName) flag.Deprecated = def.DeprecatedMessage flag.Hidden = true } } - cmd.PersistentFlags().AddFlagSet(fset) + cmd.PersistentFlags().AddFlagSet(flagSet) } return nil } diff --git a/cmd/util/opts/storage_specconfig.go b/cmd/util/opts/storage_specconfig.go index bb6715b1f0..8c4ee7c489 100644 --- a/cmd/util/opts/storage_specconfig.go +++ b/cmd/util/opts/storage_specconfig.go @@ -73,7 +73,7 @@ func (o *StorageSpecConfigOpt) Set(value string) error { options[k] = v } default: - return fmt.Errorf("unpexted key %s in field %s", key, field) + return fmt.Errorf("unexpected key %s in field %s", key, field) } } alias := sourceURI diff --git a/cmd/util/tokens.go b/cmd/util/tokens.go index 545ed244fc..3c46c0161b 100644 --- a/cmd/util/tokens.go +++ b/cmd/util/tokens.go @@ -46,7 +46,7 @@ func writeTokens(path string, t tokens) error { return json.NewEncoder(file).Encode(t) } -// Read the authorization crdential associated with the passed API base URL. If +// Read the authorization credentials associated with the passed API base URL. If // there is no credential currently stored, ReadToken will return nil with no // error. func ReadToken(path string, apiURL string) (*apimodels.HTTPCredential, error) { diff --git a/cspell.json b/cspell.json deleted file mode 100644 index acd34f1354..0000000000 --- a/cspell.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "version": "0.2", - "ignorePaths": [], - "dictionaryDefinitions": [], - "dictionaries": [], - "ignoreWords": [], - "import": [".cspell-code.json"] -} diff --git a/cspell.yaml b/cspell.yaml new file mode 100644 index 0000000000..274eb310ff --- /dev/null +++ b/cspell.yaml @@ -0,0 +1,62 @@ +version: '0.2' +language: en +allowCompoundWords: true + +# Dictionary configurations +dictionaryDefinitions: + - name: custom-dictionary + path: ./.cspell/custom-dictionary.txt + addWords: true + +dictionaries: + # General dictionaries + - en + - en-gb + # Programming language-specific dictionaries + - python + - golang + - typescript + - node + - html + - css + - cpp + # Technology-specific dictionaries + - k8s + - terraform + # Custom dictionaries + - custom-words + - custom-dictionary + +# Paths to ignore +ignorePaths: + - python/mkdocs.yml + - webui/build + - webui/node_modules + - webui/lib/api/generated/** + +# Patterns to ignore +ignoreRegExpList: + # Internet and email + - Urls + - Email + # Cryptography and security + - RsaCert + - SshRsa + - SHA + # Encoding + - Base64 + - Base64MultiLine + - Base64SingleLine + - HexDigits + # Programming-related + - CommitHash + - CommitHashLink + - CStyleHexValue + - CSSHexValue + - EscapedUnicodeCharacters + - EscapeCharacters + - HashStrings + - UnicodeRef + - UUID + # Custom patterns + - /github.com.*/ diff --git a/ops/metrics/grafana/provisioning/dashboards/dashboard.json b/ops/metrics/grafana/provisioning/dashboards/dashboard.json index ecee403d32..9e05872bdf 100644 --- a/ops/metrics/grafana/provisioning/dashboards/dashboard.json +++ b/ops/metrics/grafana/provisioning/dashboards/dashboard.json @@ -182,7 +182,7 @@ "useBackend": false } ], - "title": "Jobs Receieved", + "title": "Jobs Received", "type": "stat" }, { @@ -656,7 +656,7 @@ "refId": "A" } ], - "title": "Averagef HTTP Requests Duration over 5min", + "title": "Average HTTP Requests Duration over 5min", "type": "timeseries" }, { @@ -930,7 +930,7 @@ "useBackend": false } ], - "title": "Evaluatio Broker Cancelable", + "title": "Evaluation Broker Cancelable", "type": "stat" }, { @@ -1004,7 +1004,7 @@ "useBackend": false } ], - "title": "Evaluatio Broker Inflight", + "title": "Evaluation Broker Inflight", "type": "stat" }, { @@ -1078,7 +1078,7 @@ "useBackend": false } ], - "title": "Evaluatio Broker Pending", + "title": "Evaluation Broker Pending", "type": "stat" }, { @@ -1152,7 +1152,7 @@ "useBackend": false } ], - "title": "Evaluatio Broker Waiting", + "title": "Evaluation Broker Waiting", "type": "stat" }, { diff --git a/pkg/authz/policies/policy_ns_anon.rego b/pkg/authz/policies/policy_ns_anon.rego index b243fd2dd9..c56ab79d13 100644 --- a/pkg/authz/policies/policy_ns_anon.rego +++ b/pkg/authz/policies/policy_ns_anon.rego @@ -30,7 +30,7 @@ allow if { namespace_readable(job_namespace_perms) } -# Allow reading all other endpoints, inclduing by users who don't have a token +# Allow reading all other endpoints, including by users who don't have a token allow if { input.http.path != job_endpoint not is_legacy_api @@ -51,7 +51,7 @@ allow if { not input.http.path[3] in ["submit", "cancel"] } -# Allow posting to auth endpoints, neccessary to get a token in the first place +# Allow posting to auth endpoints, necessary to get a token in the first place allow if { input.http.path[2] == "auth" } diff --git a/pkg/compute/executor.go b/pkg/compute/executor.go index 3908042204..f27a5fac7e 100644 --- a/pkg/compute/executor.go +++ b/pkg/compute/executor.go @@ -65,31 +65,31 @@ func NewBaseExecutor(params BaseExecutorParams) *BaseExecutor { func prepareInputVolumes( ctx context.Context, - strgprovider storage.StorageProvider, + storageProvider storage.StorageProvider, storageDirectory string, inputSources ...*models.InputSource) ( []storage.PreparedStorage, func(context.Context) error, error, ) { - inputVolumes, err := storage.ParallelPrepareStorage(ctx, strgprovider, storageDirectory, inputSources...) + inputVolumes, err := storage.ParallelPrepareStorage(ctx, storageProvider, storageDirectory, inputSources...) if err != nil { return nil, nil, err } return inputVolumes, func(ctx context.Context) error { - return storage.ParallelCleanStorage(ctx, strgprovider, inputVolumes) + return storage.ParallelCleanStorage(ctx, storageProvider, inputVolumes) }, nil } func prepareWasmVolumes( ctx context.Context, - strgprovider storage.StorageProvider, + storageProvider storage.StorageProvider, storageDirectory string, wasmEngine wasmmodels.EngineSpec) ( map[string][]storage.PreparedStorage, func(context.Context) error, error, ) { - importModuleVolumes, err := storage.ParallelPrepareStorage(ctx, strgprovider, storageDirectory, wasmEngine.ImportModules...) + importModuleVolumes, err := storage.ParallelPrepareStorage(ctx, storageProvider, storageDirectory, wasmEngine.ImportModules...) if err != nil { return nil, nil, err } - entryModuleVolumes, err := storage.ParallelPrepareStorage(ctx, strgprovider, storageDirectory, wasmEngine.EntryModule) + entryModuleVolumes, err := storage.ParallelPrepareStorage(ctx, storageProvider, storageDirectory, wasmEngine.EntryModule) if err != nil { return nil, nil, err } @@ -100,8 +100,8 @@ func prepareWasmVolumes( } cleanup := func(ctx context.Context) error { - err1 := storage.ParallelCleanStorage(ctx, strgprovider, importModuleVolumes) - err2 := storage.ParallelCleanStorage(ctx, strgprovider, entryModuleVolumes) + err1 := storage.ParallelCleanStorage(ctx, storageProvider, importModuleVolumes) + err2 := storage.ParallelCleanStorage(ctx, storageProvider, entryModuleVolumes) if err1 != nil || err2 != nil { return fmt.Errorf("Error cleaning up WASM volumes: %v, %v", err1, err2) } @@ -119,21 +119,21 @@ func prepareWasmVolumes( // // For example, an InputCleanupFn might be responsible for deallocating storage used // for input volumes, or deleting temporary input files that were created as part of the -// job's execution. The nature of it operation depends on the storage provided by `strgprovider` and +// job's execution. The nature of it operation depends on the storage provided by `storageProvider` and // input sources of the jobs associated tasks. For the case of a wasm job its input and entry module storage volumes // should be removed via the method after the jobs execution reaches a terminal state. type InputCleanupFn = func(context.Context) error func PrepareRunArguments( ctx context.Context, - strgprovider storage.StorageProvider, + storageProvider storage.StorageProvider, storageDirectory string, execution *models.Execution, resultsDir string, ) (*executor.RunCommandRequest, InputCleanupFn, error) { var cleanupFuncs []func(context.Context) error - inputVolumes, inputCleanup, err := prepareInputVolumes(ctx, strgprovider, storageDirectory, execution.Job.Task().InputSources...) + inputVolumes, inputCleanup, err := prepareInputVolumes(ctx, storageProvider, storageDirectory, execution.Job.Task().InputSources...) if err != nil { return nil, nil, err } @@ -162,7 +162,7 @@ func PrepareRunArguments( return nil, nil, err } - volumes, wasmCleanup, err := prepareWasmVolumes(ctx, strgprovider, storageDirectory, wasmEngine) + volumes, wasmCleanup, err := prepareWasmVolumes(ctx, storageProvider, storageDirectory, wasmEngine) if err != nil { return nil, nil, err } @@ -259,7 +259,7 @@ func (e *BaseExecutor) Start(ctx context.Context, execution *models.Execution) * log.Ctx(ctx).Debug().Msg("starting execution") if e.failureInjection.IsBadActor { - result.Err = fmt.Errorf("i am a baaad node. i failed execution %s", execution.ID) + result.Err = fmt.Errorf("i am a bad node. i failed execution %s", execution.ID) return result } diff --git a/pkg/compute/executor_buffer.go b/pkg/compute/executor_buffer.go index cc3fb6e12b..764149dfe6 100644 --- a/pkg/compute/executor_buffer.go +++ b/pkg/compute/executor_buffer.go @@ -154,7 +154,7 @@ func (s *ExecutorBuffer) deque() { // There are at most max matches, so try at most that many times max := s.queuedTasks.Len() for i := 0; i < max; i++ { - qitem := s.queuedTasks.DequeueWhere(func(task *bufferTask) bool { + qItem := s.queuedTasks.DequeueWhere(func(task *bufferTask) bool { // If we don't have enough resources to run this task, then we will skip it queuedResources := task.localExecutionState.Execution.TotalAllocatedResources() allocatedResources := s.runningCapacity.AddIfHasCapacity(ctx, *queuedResources) @@ -174,13 +174,13 @@ func (s *ExecutorBuffer) deque() { return true }) - if qitem == nil { + if qItem == nil { // We didn't find anything in the queue that matches our resource availability so we will // break out of this look as there is nothing else to find break } - task := qitem.Value + task := qItem.Value // Move the execution to the running list and remove from the list of enqueued IDs // before we actually run the task diff --git a/pkg/compute/store/test/store_suite.go b/pkg/compute/store/test/store_suite.go index 80c3147323..87808bde0d 100644 --- a/pkg/compute/store/test/store_suite.go +++ b/pkg/compute/store/test/store_suite.go @@ -18,7 +18,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/test/mock" ) -type StoreCreator func(ctx context.Context, dbpath string) (store.ExecutionStore, error) +type StoreCreator func(ctx context.Context, dbPath string) (store.ExecutionStore, error) type StoreSuite struct { suite.Suite diff --git a/pkg/config/types/bacalhau_test.go b/pkg/config/types/bacalhau_test.go index f203374a60..5eebad8821 100644 --- a/pkg/config/types/bacalhau_test.go +++ b/pkg/config/types/bacalhau_test.go @@ -120,9 +120,9 @@ func TestBacalhauMergeNew(t *testing.T) { NameProvider: "test", } other := Bacalhau{} - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.Equal(t, base, MergeNewd) + assert.Equal(t, base, mergedNew) }) t.Run("MergeNew overwrites existing fields", func(t *testing.T) { @@ -139,12 +139,12 @@ func TestBacalhauMergeNew(t *testing.T) { }, StrictVersionMatch: true, } - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.Equal(t, "otherhost", MergeNewd.API.Host) - assert.Equal(t, 8080, MergeNewd.API.Port) - assert.Equal(t, "test", MergeNewd.NameProvider) - assert.True(t, MergeNewd.StrictVersionMatch) + assert.Equal(t, "otherhost", mergedNew.API.Host) + assert.Equal(t, 8080, mergedNew.API.Port) + assert.Equal(t, "test", mergedNew.NameProvider) + assert.True(t, mergedNew.StrictVersionMatch) }) t.Run("MergeNew with nested structs", func(t *testing.T) { @@ -165,11 +165,11 @@ func TestBacalhauMergeNew(t *testing.T) { }, }, } - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.True(t, MergeNewd.Orchestrator.Enabled) - assert.Equal(t, "base.local", MergeNewd.Orchestrator.Host) - assert.Equal(t, Duration(10), MergeNewd.Orchestrator.NodeManager.DisconnectTimeout) + assert.True(t, mergedNew.Orchestrator.Enabled) + assert.Equal(t, "base.local", mergedNew.Orchestrator.Host) + assert.Equal(t, Duration(10), mergedNew.Orchestrator.NodeManager.DisconnectTimeout) }) t.Run("MergeNew with slices", func(t *testing.T) { @@ -183,9 +183,9 @@ func TestBacalhauMergeNew(t *testing.T) { Orchestrators: []string{"nats://127.0.0.1:4223", "nats://127.0.0.1:4224"}, }, } - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.Equal(t, []string{"nats://127.0.0.1:4223", "nats://127.0.0.1:4224"}, MergeNewd.Compute.Orchestrators) + assert.Equal(t, []string{"nats://127.0.0.1:4223", "nats://127.0.0.1:4224"}, mergedNew.Compute.Orchestrators) }) t.Run("MergeNew doesn't affect original configs", func(t *testing.T) { @@ -200,11 +200,11 @@ func TestBacalhauMergeNew(t *testing.T) { Host: "otherhost", }, } - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.NotEqual(t, base, MergeNewd) - assert.NotEqual(t, other, MergeNewd) + assert.NotEqual(t, base, mergedNew) + assert.NotEqual(t, other, mergedNew) assert.Equal(t, "localhost", base.API.Host) assert.Equal(t, "otherhost", other.API.Host) }) @@ -235,11 +235,11 @@ func TestBacalhauMergeNew(t *testing.T) { }, }, } - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.Equal(t, 1, MergeNewd.JobDefaults.Batch.Priority) - assert.Equal(t, "1000m", MergeNewd.JobDefaults.Batch.Task.Resources.CPU) - assert.Equal(t, "1Gb", MergeNewd.JobDefaults.Batch.Task.Resources.Memory) + assert.Equal(t, 1, mergedNew.JobDefaults.Batch.Priority) + assert.Equal(t, "1000m", mergedNew.JobDefaults.Batch.Task.Resources.CPU) + assert.Equal(t, "1Gb", mergedNew.JobDefaults.Batch.Task.Resources.Memory) }) } diff --git a/pkg/config/types/gen/generate.go b/pkg/config/types/gen/generate.go index ef00c27352..af644e9f22 100644 --- a/pkg/config/types/gen/generate.go +++ b/pkg/config/types/gen/generate.go @@ -113,8 +113,8 @@ func WriteConstants(fieldInfos map[string]FieldInfo, w io.Writer) error { func ConfigFieldMap(dir string) map[string]FieldInfo { // Parse the package directory - fset := token.NewFileSet() - pkgs, err := parser.ParseDir(fset, dir, nil, parser.ParseComments) + fileSet := token.NewFileSet() + pkgs, err := parser.ParseDir(fileSet, dir, nil, parser.ParseComments) if err != nil { log.Fatal(err) } diff --git a/pkg/devstack/devstack.go b/pkg/devstack/devstack.go index fed9d32e7c..19bb791ff9 100644 --- a/pkg/devstack/devstack.go +++ b/pkg/devstack/devstack.go @@ -126,11 +126,11 @@ func Setup( if isComputeNode { // We have multiple process on the same machine, all wanting to listen on a HTTP port // and so we will give each compute node a random open port to listen on. - fport, err := network.GetFreePort() + freePort, err := network.GetFreePort() if err != nil { return nil, errors.Wrap(err, "failed to get free port for local publisher") } - cfg.Publishers.Types.Local.Port = fport + cfg.Publishers.Types.Local.Port = freePort } cfg.Orchestrator.Enabled = isRequesterNode diff --git a/pkg/eventhandler/chained_handlers.go b/pkg/eventhandler/chained_handlers.go deleted file mode 100644 index 93cbd95273..0000000000 --- a/pkg/eventhandler/chained_handlers.go +++ /dev/null @@ -1,61 +0,0 @@ -package eventhandler - -import ( - "context" - "fmt" - "time" - - "github.com/rs/zerolog/log" - - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// An event handler implementation that chains multiple event handlers, and accepts a context provider -// to setup up the context once for all handlers. -type ChainedJobEventHandler struct { - eventHandlers []JobEventHandler - contextProvider ContextProvider -} - -func NewChainedJobEventHandler(contextProvider ContextProvider) *ChainedJobEventHandler { - return &ChainedJobEventHandler{contextProvider: contextProvider} -} - -func (r *ChainedJobEventHandler) AddHandlers(handlers ...JobEventHandler) { - r.eventHandlers = append(r.eventHandlers, handlers...) -} - -func (r *ChainedJobEventHandler) HandleJobEvent(ctx context.Context, event models.JobEvent) (err error) { - startTime := time.Now() - defer logEvent(ctx, event, startTime)(&err) - - if r.eventHandlers == nil { - return fmt.Errorf("no event handlers registered") - } - - jobCtx := r.contextProvider.GetContext(ctx, event.JobID) - - // All handlers are called, unless one of them returns an error. - for _, handler := range r.eventHandlers { - if err = handler.HandleJobEvent(jobCtx, event); err != nil { //nolint:gocritic - return err - } - } - return nil -} - -func logEvent(ctx context.Context, event models.JobEvent, startTime time.Time) func(*error) { - return func(handlerError *error) { - logMsg := log.Ctx(ctx).Debug(). - Str("EventName", event.EventName.String()). - Str("JobID", event.JobID). - Str("NodeID", event.SourceNodeID). - Str("Status", event.Status). - Dur("HandleDuration", time.Since(startTime)) - if *handlerError != nil { - logMsg = logMsg.AnErr("HandlerError", *handlerError) - } - - logMsg.Msg("Handled event") - } -} diff --git a/pkg/eventhandler/chained_handlers_test.go b/pkg/eventhandler/chained_handlers_test.go deleted file mode 100644 index cf92117769..0000000000 --- a/pkg/eventhandler/chained_handlers_test.go +++ /dev/null @@ -1,111 +0,0 @@ -//go:build unit || !integration - -package eventhandler - -import ( - "context" - "fmt" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.uber.org/mock/gomock" - - "github.com/bacalhau-project/bacalhau/pkg/eventhandler/mock_eventhandler" - "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// In order for 'go test' to run this suite, we need to create -// a normal test function and pass our suite to suite.Run -func TestChainedHandlers(t *testing.T) { - suite.Run(t, new(jobEventHandlerSuite)) -} - -type jobEventHandlerSuite struct { - suite.Suite - ctrl *gomock.Controller - chainedHandler *ChainedJobEventHandler - handler1 *mock_eventhandler.MockJobEventHandler - handler2 *mock_eventhandler.MockJobEventHandler - contextProvider *mock_eventhandler.MockContextProvider - context context.Context - event models.JobEvent -} - -// Before each test -func (suite *jobEventHandlerSuite) SetupTest() { - suite.ctrl = gomock.NewController(suite.T()) - suite.handler1 = mock_eventhandler.NewMockJobEventHandler(suite.ctrl) - suite.handler2 = mock_eventhandler.NewMockJobEventHandler(suite.ctrl) - suite.contextProvider = mock_eventhandler.NewMockContextProvider(suite.ctrl) - suite.chainedHandler = NewChainedJobEventHandler(suite.contextProvider) - suite.context = context.WithValue(context.Background(), "test", "test") - suite.event = models.JobEvent{ - EventName: models.JobEventCreated, - JobID: uuid.NewString(), - SourceNodeID: "nodeA", - Status: "this is a test event", - } - logger.ConfigureTestLogging(suite.T()) -} - -func (suite *jobEventHandlerSuite) TearDownTest() { - suite.ctrl.Finish() -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEvent() { - suite.chainedHandler.AddHandlers(suite.handler1, suite.handler2) - ctx := context.Background() - - // assert context provider is called with the correct context and job id - suite.contextProvider.EXPECT().GetContext(ctx, suite.event.JobID).Return(suite.context) - - // assert both handlers are called with the context provider's context and event - gomock.InOrder( - suite.handler1.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - suite.handler2.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - ) - - // assert no error was returned - require.NoError(suite.T(), suite.chainedHandler.HandleJobEvent(ctx, suite.event)) -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEventLazilyAdded() { - suite.chainedHandler.AddHandlers(suite.handler1) - suite.chainedHandler.AddHandlers(suite.handler2) - ctx := context.Background() - - // assert context provider is called with the correct context and job id - suite.contextProvider.EXPECT().GetContext(ctx, suite.event.JobID).Return(suite.context) - - // assert both handlers are called with the context provider's context and event - gomock.InOrder( - suite.handler1.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - suite.handler2.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - ) - - // assert no error was returned - require.NoError(suite.T(), suite.chainedHandler.HandleJobEvent(ctx, suite.event)) -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEventError() { - suite.chainedHandler.AddHandlers(suite.handler1) - suite.chainedHandler.AddHandlers(suite.handler2) - ctx := context.Background() - mockError := fmt.Errorf("i am an error") - - // assert context provider is called with the correct context and job id - suite.contextProvider.EXPECT().GetContext(ctx, suite.event.JobID).Return(suite.context) - - // mock first handler to return an error, and don't expect the second handler to be called - suite.handler1.EXPECT().HandleJobEvent(suite.context, suite.event).Return(mockError) - - // assert no error was returned - require.Equal(suite.T(), mockError, suite.chainedHandler.HandleJobEvent(ctx, suite.event)) -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEventEmptyHandlers() { - require.Error(suite.T(), suite.chainedHandler.HandleJobEvent(context.Background(), suite.event)) -} diff --git a/pkg/eventhandler/context_provider.go b/pkg/eventhandler/context_provider.go deleted file mode 100644 index 774878edcd..0000000000 --- a/pkg/eventhandler/context_provider.go +++ /dev/null @@ -1,81 +0,0 @@ -package eventhandler - -import ( - "context" - "sync" - - "go.opentelemetry.io/otel/attribute" - oteltrace "go.opentelemetry.io/otel/trace" - - "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/telemetry" -) - -// Interface for a context provider that can be used to generate a context to be used to handle -// job events. -type ContextProvider interface { - GetContext(ctx context.Context, jobID string) context.Context -} - -// TracerContextProvider is a context provider that generates a context along with tracing information. -// It also implements JobEventHandler to end the local lifecycle context for a job when it is completed. -type TracerContextProvider struct { - nodeID string - jobNodeContexts map[string]context.Context // per-node job lifecycle - contextMutex sync.RWMutex -} - -func NewTracerContextProvider(nodeID string) *TracerContextProvider { - return &TracerContextProvider{ - nodeID: nodeID, - jobNodeContexts: make(map[string]context.Context), - } -} - -func (t *TracerContextProvider) GetContext(ctx context.Context, jobID string) context.Context { - t.contextMutex.Lock() - defer t.contextMutex.Unlock() - - jobCtx, _ := telemetry.Span(ctx, "pkg/eventhandler/JobEventHandler.HandleJobEvent", - oteltrace.WithSpanKind(oteltrace.SpanKindInternal), - oteltrace.WithAttributes( - attribute.String(telemetry.TracerAttributeNameNodeID, t.nodeID), - attribute.String(telemetry.TracerAttributeNameJobID, jobID), - ), - ) - - // keep the latest context to clean it up during shutdown if necessary - t.jobNodeContexts[jobID] = jobCtx - return jobCtx -} - -func (t *TracerContextProvider) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - // If the event is known to be terminal, end the local lifecycle context: - if event.EventName.IsTerminal() { - t.endJobNodeContext(ctx, event.JobID) - } - - return nil -} - -func (t *TracerContextProvider) Shutdown() error { - t.contextMutex.RLock() - defer t.contextMutex.RUnlock() - - for _, ctx := range t.jobNodeContexts { - oteltrace.SpanFromContext(ctx).End() - } - - // clear the maps - t.jobNodeContexts = make(map[string]context.Context) - - return nil -} - -// endJobNodeContext ends the local lifecycle context for a job. -func (t *TracerContextProvider) endJobNodeContext(ctx context.Context, jobID string) { - oteltrace.SpanFromContext(ctx).End() - t.contextMutex.Lock() - defer t.contextMutex.Unlock() - delete(t.jobNodeContexts, jobID) -} diff --git a/pkg/eventhandler/interfaces.go b/pkg/eventhandler/interfaces.go deleted file mode 100644 index 5c332daa28..0000000000 --- a/pkg/eventhandler/interfaces.go +++ /dev/null @@ -1,21 +0,0 @@ -package eventhandler - -//go:generate mockgen --source interfaces.go --destination mock_eventhandler/mock_handlers.go --package mock_eventhandler - -import ( - "context" - - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// A job event handler is a component that is notified of events related to jobs. -type JobEventHandler interface { - HandleJobEvent(ctx context.Context, event models.JobEvent) error -} - -// function that implements the JobEventHandler interface -type JobEventHandlerFunc func(ctx context.Context, event models.JobEvent) error - -func (f JobEventHandlerFunc) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - return f(ctx, event) -} diff --git a/pkg/eventhandler/mock_eventhandler/mock_contextprovider.go b/pkg/eventhandler/mock_eventhandler/mock_contextprovider.go deleted file mode 100644 index 4bb1fc0721..0000000000 --- a/pkg/eventhandler/mock_eventhandler/mock_contextprovider.go +++ /dev/null @@ -1,49 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: system/context_provider.go - -// Package mock_system is a generated GoMock package. -package mock_eventhandler - -import ( - context "context" - reflect "reflect" - - gomock "go.uber.org/mock/gomock" -) - -// MockContextProvider is a mock of ContextProvider interface. -type MockContextProvider struct { - ctrl *gomock.Controller - recorder *MockContextProviderMockRecorder -} - -// MockContextProviderMockRecorder is the mock recorder for MockContextProvider. -type MockContextProviderMockRecorder struct { - mock *MockContextProvider -} - -// NewMockContextProvider creates a new mock instance. -func NewMockContextProvider(ctrl *gomock.Controller) *MockContextProvider { - mock := &MockContextProvider{ctrl: ctrl} - mock.recorder = &MockContextProviderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockContextProvider) EXPECT() *MockContextProviderMockRecorder { - return m.recorder -} - -// GetContext mocks base method. -func (m *MockContextProvider) GetContext(ctx context.Context, jobID string) context.Context { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetContext", ctx, jobID) - ret0, _ := ret[0].(context.Context) - return ret0 -} - -// GetContext indicates an expected call of GetContext. -func (mr *MockContextProviderMockRecorder) GetContext(ctx, jobID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetContext", reflect.TypeOf((*MockContextProvider)(nil).GetContext), ctx, jobID) -} diff --git a/pkg/eventhandler/mock_eventhandler/mock_handlers.go b/pkg/eventhandler/mock_eventhandler/mock_handlers.go deleted file mode 100644 index 832e5914df..0000000000 --- a/pkg/eventhandler/mock_eventhandler/mock_handlers.go +++ /dev/null @@ -1,55 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: interfaces.go -// -// Generated by this command: -// -// mockgen --source interfaces.go --destination mock_eventhandler/mock_handlers.go --package mock_eventhandler -// - -// Package mock_eventhandler is a generated GoMock package. -package mock_eventhandler - -import ( - context "context" - reflect "reflect" - - models "github.com/bacalhau-project/bacalhau/pkg/models" - gomock "go.uber.org/mock/gomock" -) - -// MockJobEventHandler is a mock of JobEventHandler interface. -type MockJobEventHandler struct { - ctrl *gomock.Controller - recorder *MockJobEventHandlerMockRecorder -} - -// MockJobEventHandlerMockRecorder is the mock recorder for MockJobEventHandler. -type MockJobEventHandlerMockRecorder struct { - mock *MockJobEventHandler -} - -// NewMockJobEventHandler creates a new mock instance. -func NewMockJobEventHandler(ctrl *gomock.Controller) *MockJobEventHandler { - mock := &MockJobEventHandler{ctrl: ctrl} - mock.recorder = &MockJobEventHandlerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockJobEventHandler) EXPECT() *MockJobEventHandlerMockRecorder { - return m.recorder -} - -// HandleJobEvent mocks base method. -func (m *MockJobEventHandler) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HandleJobEvent", ctx, event) - ret0, _ := ret[0].(error) - return ret0 -} - -// HandleJobEvent indicates an expected call of HandleJobEvent. -func (mr *MockJobEventHandlerMockRecorder) HandleJobEvent(ctx, event any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleJobEvent", reflect.TypeOf((*MockJobEventHandler)(nil).HandleJobEvent), ctx, event) -} diff --git a/pkg/eventhandler/tracer.go b/pkg/eventhandler/tracer.go deleted file mode 100644 index 4db8f63dcb..0000000000 --- a/pkg/eventhandler/tracer.go +++ /dev/null @@ -1,78 +0,0 @@ -package eventhandler - -import ( - "context" - "fmt" - "io/fs" - "os" - - "github.com/rs/zerolog" - - "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// Tracer is a JobEventHandler that will marshal the received event to a -// file-based log. -// -// Note that we don't need any mutexes here because writing to an os.File is -// thread-safe (see https://github.com/rs/zerolog/blob/master/writer.go#L33) -type Tracer struct { - LogFile *os.File - Logger zerolog.Logger -} - -const eventTracerFilePerms fs.FileMode = 0644 - -// Returns an eventhandler.Tracer that writes to config.GetEventTracerPath(), or -// an error if the file can't be opened. -func NewTracer(path string) (*Tracer, error) { - return NewTracerToFile(path) -} - -// Returns an eventhandler.Tracer that writes to the specified filename, or an -// error if the file can't be opened. -func NewTracerToFile(filename string) (*Tracer, error) { - file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, eventTracerFilePerms) - if err != nil { - return nil, err - } - - return &Tracer{ - LogFile: file, - Logger: zerolog.New(file).With().Timestamp().Logger(), - }, nil -} - -// HandleJobEvent implements JobEventHandler -func (t *Tracer) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - trace(t.Logger, event) - return nil -} - -func trace[Event any](log zerolog.Logger, event Event) { - log.Log(). - Str("Type", fmt.Sprintf("%T", event)). - Func(func(e *zerolog.Event) { - // TODO: #828 Potential hotspot - marshaling is expensive, and - // we do it for every event. - eventJSON, err := marshaller.JSONMarshalWithMax(event) - if err == nil { - e.RawJSON("Event", eventJSON) - } else { - e.AnErr("MarshalError", err) - } - }).Send() -} - -func (t *Tracer) Shutdown() error { - if t.LogFile != nil { - err := t.LogFile.Close() - t.LogFile = nil - t.Logger = zerolog.Nop() - return err - } - return nil -} - -var _ JobEventHandler = (*Tracer)(nil) diff --git a/pkg/executor/docker/executor.go b/pkg/executor/docker/executor.go index 4ead9dea53..adb77ac97d 100644 --- a/pkg/executor/docker/executor.go +++ b/pkg/executor/docker/executor.go @@ -222,7 +222,7 @@ func (e *Executor) doWait(ctx context.Context, out chan *models.RunCommandResult out <- handle.result } else { // NB(forrest): this shouldn't happen with the wasm and docker executors, but handling it as it - // represents a significant error in executor logic, which may occur in future pluggable executor impls. + // represents a significant error in executor logic, which may occur in future pluggable executor impl. errCh <- fmt.Errorf("execution (%s) result is nil", handle.executionID) } } @@ -473,8 +473,8 @@ func makeContainerMounts( return nil, fmt.Errorf("output volume has no Location: %+v", output) } - srcd := filepath.Join(resultsDir, output.Name) - if err := os.Mkdir(srcd, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W); err != nil { + srcDir := filepath.Join(resultsDir, output.Name) + if err := os.Mkdir(srcDir, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W); err != nil { return nil, fmt.Errorf("failed to create results dir for execution: %w", err) } @@ -486,7 +486,7 @@ func makeContainerMounts( // this is an output volume so can be written to ReadOnly: false, // we create a named folder in the job results folder for this output - Source: srcd, + Source: srcDir, // the path of the output volume is from the perspective of inside the container Target: output.Path, }) diff --git a/pkg/executor/docker/executor_test.go b/pkg/executor/docker/executor_test.go index 085ddb239a..f1c8a3812d 100644 --- a/pkg/executor/docker/executor_test.go +++ b/pkg/executor/docker/executor_test.go @@ -436,7 +436,7 @@ func (s *ExecutorTestSuite) TestDockerExecutionCancellation() { // This is important to do. In our docker executor, we set active to true, before calling the docker client with ContainerStart // Hence there is a bit of time before the container actually gets started. The correct way of identifying that whether - // a contianer has started or not is via activeCh. We want to make sure that contianer is started before canceling the execution. + // a container has started or not is via activeCh. We want to make sure that container is started before canceling the execution. handler, _ := s.executor.handlers.Get(executionID) <-handler.activeCh diff --git a/pkg/executor/docker/models/types_test.go b/pkg/executor/docker/models/types_test.go index 4c0cc0f4a1..ff179f18fa 100644 --- a/pkg/executor/docker/models/types_test.go +++ b/pkg/executor/docker/models/types_test.go @@ -21,14 +21,14 @@ func TestDockerEngineBuilder_RoundTrip(t *testing.T) { { name: "valid spec all fields", builder: func() *DockerEngineBuilder { - return NewDockerEngineBuilder("myimage"). + return NewDockerEngineBuilder("myImage"). WithEntrypoint("bash", "-c"). WithEnvironmentVariables("KEY1=VALUE1", "KEY2=VALUE2"). WithWorkingDirectory("/app"). WithParameters("arg1", "arg2") }, expectedSpec: EngineSpec{ - Image: "myimage", + Image: "myImage", Entrypoint: []string{"bash", "-c"}, EnvironmentVariables: []string{"KEY1=VALUE1", "KEY2=VALUE2"}, WorkingDirectory: "/app", @@ -38,13 +38,13 @@ func TestDockerEngineBuilder_RoundTrip(t *testing.T) { { name: "valid spec no entry point", builder: func() *DockerEngineBuilder { - return NewDockerEngineBuilder("myimage"). + return NewDockerEngineBuilder("myImage"). WithEnvironmentVariables("KEY1=VALUE1", "KEY2=VALUE2"). WithWorkingDirectory("/app"). WithParameters("arg1", "arg2") }, expectedSpec: EngineSpec{ - Image: "myimage", + Image: "myImage", EnvironmentVariables: []string{"KEY1=VALUE1", "KEY2=VALUE2"}, WorkingDirectory: "/app", Parameters: []string{"arg1", "arg2"}, @@ -53,13 +53,13 @@ func TestDockerEngineBuilder_RoundTrip(t *testing.T) { { name: "valid spec no env var", builder: func() *DockerEngineBuilder { - return NewDockerEngineBuilder("myimage"). + return NewDockerEngineBuilder("myImage"). WithEntrypoint("bash", "-c"). WithWorkingDirectory("/app"). WithParameters("arg1", "arg2") }, expectedSpec: EngineSpec{ - Image: "myimage", + Image: "myImage", Entrypoint: []string{"bash", "-c"}, WorkingDirectory: "/app", Parameters: []string{"arg1", "arg2"}, @@ -68,13 +68,13 @@ func TestDockerEngineBuilder_RoundTrip(t *testing.T) { { name: "valid spec no params", builder: func() *DockerEngineBuilder { - return NewDockerEngineBuilder("myimage"). + return NewDockerEngineBuilder("myImage"). WithEntrypoint("bash", "-c"). WithEnvironmentVariables("KEY1=VALUE1", "KEY2=VALUE2"). WithWorkingDirectory("/app") }, expectedSpec: EngineSpec{ - Image: "myimage", + Image: "myImage", Entrypoint: []string{"bash", "-c"}, EnvironmentVariables: []string{"KEY1=VALUE1", "KEY2=VALUE2"}, WorkingDirectory: "/app", @@ -83,13 +83,13 @@ func TestDockerEngineBuilder_RoundTrip(t *testing.T) { { name: "valid spec no working dir", builder: func() *DockerEngineBuilder { - return NewDockerEngineBuilder("myimage"). + return NewDockerEngineBuilder("myImage"). WithEntrypoint("bash", "-c"). WithEnvironmentVariables("KEY1=VALUE1", "KEY2=VALUE2"). WithParameters("arg1", "arg2") }, expectedSpec: EngineSpec{ - Image: "myimage", + Image: "myImage", Entrypoint: []string{"bash", "-c"}, EnvironmentVariables: []string{"KEY1=VALUE1", "KEY2=VALUE2"}, Parameters: []string{"arg1", "arg2"}, diff --git a/pkg/executor/docker/network.go b/pkg/executor/docker/network.go index 73c22df42a..21495c7c3b 100644 --- a/pkg/executor/docker/network.go +++ b/pkg/executor/docker/network.go @@ -135,10 +135,10 @@ func (e *Executor) createHTTPGateway( } // Create the gateway container initially attached to the *host* network - domainList, derr := json.Marshal(networkConfig.DomainSet()) - clientList, cerr := json.Marshal([]string{subnet}) - if derr != nil || cerr != nil { - return nil, nil, pkgerrors.Wrap(errors.Join(derr, cerr), "error preparing gateway config") + domainList, dErr := json.Marshal(networkConfig.DomainSet()) + clientList, cErr := json.Marshal([]string{subnet}) + if dErr != nil || cErr != nil { + return nil, nil, pkgerrors.Wrap(errors.Join(dErr, cErr), "error preparing gateway config") } gatewayContainer, err := e.client.ContainerCreate(ctx, &container.Config{ diff --git a/pkg/executor/wasm/executor.go b/pkg/executor/wasm/executor.go index d53c9ad8d6..026069e709 100644 --- a/pkg/executor/wasm/executor.go +++ b/pkg/executor/wasm/executor.go @@ -273,18 +273,18 @@ func (e *Executor) makeFsFromStorage( return nil, fmt.Errorf("output volume has no path: %+v", output) } - srcd := filepath.Join(jobResultsDir, output.Name) + srcDir := filepath.Join(jobResultsDir, output.Name) log.Ctx(ctx).Debug(). Str("output", output.Name). - Str("dir", srcd). + Str("dir", srcDir). Msg("Collecting output") - err = os.Mkdir(srcd, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W) + err = os.Mkdir(srcDir, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W) if err != nil { return nil, err } - err = rootFs.Mount(output.Name, touchfs.New(srcd)) + err = rootFs.Mount(output.Name, touchfs.New(srcDir)) if err != nil { return nil, err } diff --git a/pkg/executor/wasm/loader.go b/pkg/executor/wasm/loader.go index 9f9e205855..e8f7e3dbfc 100644 --- a/pkg/executor/wasm/loader.go +++ b/pkg/executor/wasm/loader.go @@ -25,7 +25,7 @@ import ( // the WebAssembly program, allowing the user to deploy self-contained // WebAssembly blobs. See the introductory talk at https://youtu.be/6zJkMLzXbQc. // -// This works by using the "module name" field of a WebAssmelby import header, +// This works by using the "module name" field of a WebAssembly import header, // (which for user-supplied modules is arbitrary) as a hint to the loader as to // where the dependency lives and how to retrieve it. The module still needs to // be specified as input data for the job (a previous implementation of the @@ -102,7 +102,7 @@ func (loader *ModuleLoader) loadModule(ctx context.Context, m storage.PreparedSt // InstantiateRemoteModule loads and instantiates the remote module and all of // its dependencies. It only looks in the job's input storage specs for modules. // -// This function calls itself reucrsively for any discovered dependencies on the +// This function calls itself recursively for any discovered dependencies on the // loaded modules, so that the returned module has all of its dependencies fully // instantiated and is ready to use. func (loader *ModuleLoader) InstantiateRemoteModule(ctx context.Context, m storage.PreparedStorage) (api.Module, error) { diff --git a/pkg/jobstore/boltdb/store_test.go b/pkg/jobstore/boltdb/store_test.go index b5e8f7cc11..671c6a1678 100644 --- a/pkg/jobstore/boltdb/store_test.go +++ b/pkg/jobstore/boltdb/store_test.go @@ -851,7 +851,7 @@ func (s *BoltJobstoreTestSuite) TestGetExecutions() { s.Equal(2, len(state)) s.Equal(state[0].GetModifyTime().Before(state[1].GetModifyTime()), true) - // When OrderBy is set to Modified At With Reverese + // When OrderBy is set to Modified At With Reverse state, err = s.store.GetExecutions(s.ctx, jobstore.GetExecutionsOptions{ JobID: "160", OrderBy: "modified_at", diff --git a/pkg/lib/collections/hashed_priority_queue_test.go b/pkg/lib/collections/hashed_priority_queue_test.go index f5838fbe99..967301fc6b 100644 --- a/pkg/lib/collections/hashed_priority_queue_test.go +++ b/pkg/lib/collections/hashed_priority_queue_test.go @@ -136,10 +136,10 @@ func (s *HashedPriorityQueueSuite) TestDuplicateKeys() { } for _, exp := range expected { - qitem := pq.Dequeue() - s.Require().NotNil(qitem) - s.Require().Equal(exp.v, qitem.Value) - s.Require().Equal(exp.p, qitem.Priority) + qItem := pq.Dequeue() + s.Require().NotNil(qItem) + s.Require().Equal(exp.v, qItem.Value) + s.Require().Equal(exp.p, qItem.Priority) } s.Require().True(pq.IsEmpty()) diff --git a/pkg/lib/collections/priority_queue.go b/pkg/lib/collections/priority_queue.go index 8448111d3e..fda6953f23 100644 --- a/pkg/lib/collections/priority_queue.go +++ b/pkg/lib/collections/priority_queue.go @@ -158,19 +158,19 @@ func (pq *PriorityQueue[T]) DequeueWhere(matcher MatchingFunction[T]) *QueueItem // If any iteration does not generate a match, the item is requeued in a temporary // queue reading for requeueing on this queue later on. for pq.internalQueue.Len() > 0 { - qitem := pq.dequeue() + qItem := pq.dequeue() - if qitem == nil { + if qItem == nil { return nil } - if matcher(qitem.Value) { - result = qitem + if matcher(qItem.Value) { + result = qItem break } // Add to the queue - unmatched = append(unmatched, qitem) + unmatched = append(unmatched, qItem) } // Re-add the items that were not matched back onto the Q diff --git a/pkg/lib/collections/priority_queue_base_test.go b/pkg/lib/collections/priority_queue_base_test.go index 0a5db6f5cd..e8de45f323 100644 --- a/pkg/lib/collections/priority_queue_base_test.go +++ b/pkg/lib/collections/priority_queue_base_test.go @@ -36,10 +36,10 @@ func (s *PriorityQueueTestSuite) TestSimple() { } for _, tc := range expected { - qitem := pq.Dequeue() - s.Require().NotNil(qitem) - s.Require().Equal(tc.v, qitem.Value) - s.Require().Equal(tc.p, qitem.Priority) + qItem := pq.Dequeue() + s.Require().NotNil(qItem) + s.Require().Equal(tc.v, qItem.Value) + s.Require().Equal(tc.p, qItem.Priority) } s.Require().True(pq.IsEmpty()) @@ -63,10 +63,10 @@ func (s *PriorityQueueTestSuite) TestSimpleMin() { } for _, tc := range expected { - qitem := pq.Dequeue() - s.Require().NotNil(qitem) - s.Require().Equal(tc.v, qitem.Value) - s.Require().Equal(tc.p, qitem.Priority) + qItem := pq.Dequeue() + s.Require().NotNil(qItem) + s.Require().Equal(tc.v, qItem.Value) + s.Require().Equal(tc.p, qItem.Priority) } s.Require().True(pq.IsEmpty()) @@ -74,8 +74,8 @@ func (s *PriorityQueueTestSuite) TestSimpleMin() { func (s *PriorityQueueTestSuite) TestEmpty() { pq := s.NewQueue() - qitem := pq.Dequeue() - s.Require().Nil(qitem) + qItem := pq.Dequeue() + s.Require().Nil(qItem) s.Require().True(pq.IsEmpty()) } @@ -91,13 +91,13 @@ func (s *PriorityQueueTestSuite) TestDequeueWhere() { count := pq.Len() - qitem := pq.DequeueWhere(func(possibleMatch TestData) bool { + qItem := pq.DequeueWhere(func(possibleMatch TestData) bool { return possibleMatch.id == "B" }) - s.Require().NotNil(qitem) - s.Require().Equal(TestData{"B", 2}, qitem.Value) - s.Require().Equal(int64(3), qitem.Priority) + s.Require().NotNil(qItem) + s.Require().Equal(TestData{"B", 2}, qItem.Value) + s.Require().Equal(int64(3), qItem.Priority) s.Require().Equal(count-1, pq.Len()) } @@ -105,11 +105,11 @@ func (s *PriorityQueueTestSuite) TestDequeueWhereFail() { pq := s.NewQueue() pq.Enqueue(TestData{"A", 1}, 4) - qitem := pq.DequeueWhere(func(possibleMatch TestData) bool { + qItem := pq.DequeueWhere(func(possibleMatch TestData) bool { return possibleMatch.id == "Z" }) - s.Require().Nil(qitem) + s.Require().Nil(qItem) } func (s *PriorityQueueTestSuite) TestPeek() { diff --git a/pkg/lib/collections/priority_queue_test.go b/pkg/lib/collections/priority_queue_test.go index ec3adeb23d..0c7bacc917 100644 --- a/pkg/lib/collections/priority_queue_test.go +++ b/pkg/lib/collections/priority_queue_test.go @@ -53,10 +53,10 @@ func (s *PriorityQueueSuite) TestDuplicateKeys() { } for _, exp := range expected { - qitem := pq.Dequeue() - s.Require().NotNil(qitem) - s.Require().Equal(exp.v, qitem.Value) - s.Require().Equal(exp.p, qitem.Priority) + qItem := pq.Dequeue() + s.Require().NotNil(qItem) + s.Require().Equal(exp.v, qItem.Value) + s.Require().Equal(exp.p, qItem.Priority) } s.Require().True(pq.IsEmpty()) diff --git a/pkg/lib/crypto/certificate.go b/pkg/lib/crypto/certificate.go index eaedc61769..b326ae9735 100644 --- a/pkg/lib/crypto/certificate.go +++ b/pkg/lib/crypto/certificate.go @@ -75,7 +75,7 @@ func NewSignedCertificate(parent Certificate, ipAddress []net.IP) (Certificate, return Certificate{cert: cert, parent: &parent, key: certPrivKey}, nil } -func (cert *Certificate) MarshalCertficate(out io.Writer) error { +func (cert *Certificate) MarshalCertificate(out io.Writer) error { var parent *x509.Certificate var signingKey *rsa.PrivateKey diff --git a/pkg/lib/crypto/certificate_test.go b/pkg/lib/crypto/certificate_test.go index 18de762b50..22a2590c30 100644 --- a/pkg/lib/crypto/certificate_test.go +++ b/pkg/lib/crypto/certificate_test.go @@ -28,7 +28,7 @@ func TestProducesValidCertificate(t *testing.T) { cert := getTestSelfSignedCert(t) var buf bytes.Buffer - err := cert.MarshalCertficate(&buf) + err := cert.MarshalCertificate(&buf) require.NoError(t, err) block, rest := pem.Decode(buf.Bytes()) @@ -49,7 +49,7 @@ func TestProducesSignedCertificate(t *testing.T) { require.NotNil(t, cert) var buf bytes.Buffer - err = cert.MarshalCertficate(&buf) + err = cert.MarshalCertificate(&buf) require.NoError(t, err) block, rest := pem.Decode(buf.Bytes()) @@ -61,7 +61,7 @@ func TestProducesSignedCertificate(t *testing.T) { require.NotNil(t, parsed) buf.Reset() - err = parent.MarshalCertficate(&buf) + err = parent.MarshalCertificate(&buf) require.NoError(t, err) pool := x509.NewCertPool() diff --git a/pkg/lib/policy/scrypt.go b/pkg/lib/policy/scrypt.go index 15e7e41da7..f69f5ca52f 100644 --- a/pkg/lib/policy/scrypt.go +++ b/pkg/lib/policy/scrypt.go @@ -30,7 +30,7 @@ var scryptFn = rego.Function2( Memoize: true, Nondeterministic: false, }, - func(bctx rego.BuiltinContext, passwordTerm, saltTerm *ast.Term) (*ast.Term, error) { + func(bCtx rego.BuiltinContext, passwordTerm, saltTerm *ast.Term) (*ast.Term, error) { var password, salt string if err := ast.As(passwordTerm.Value, &password); err != nil { return nil, err diff --git a/pkg/logger/wasm/logmanager.go b/pkg/logger/wasm/logmanager.go index 8184afaff9..b2c3735409 100644 --- a/pkg/logger/wasm/logmanager.go +++ b/pkg/logger/wasm/logmanager.go @@ -10,10 +10,11 @@ import ( "sync" "time" + "github.com/rs/zerolog/log" + "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/util" "github.com/bacalhau-project/bacalhau/pkg/util/generic" - "github.com/rs/zerolog/log" ) const ( @@ -161,11 +162,11 @@ func (lm *LogManager) Drain() { } func (lm *LogManager) GetWriters() (io.WriteCloser, io.WriteCloser) { - writerFunc := func(strm LogStreamType) func([]byte) *LogMessage { + writerFunc := func(stream LogStreamType) func([]byte) *LogMessage { return func(b []byte) *LogMessage { m := LogMessage{ Timestamp: time.Now().Unix(), - Stream: strm, + Stream: stream, } m.Data = append([]byte(nil), b...) return &m diff --git a/pkg/models/event_test.go b/pkg/models/event_test.go index 7d739f2de7..0056836d22 100644 --- a/pkg/models/event_test.go +++ b/pkg/models/event_test.go @@ -159,7 +159,7 @@ func (suite *EventTestSuite) TestGetJobStateIfPresent() { invalidState := "InvalidState" eventWithInvalidState := models.NewEvent(suite.topic).WithDetail(models.DetailsKeyNewState, invalidState) state, err = eventWithInvalidState.GetJobStateIfPresent() - suite.NoError(err) // models.JobStateType.UnmarshallText() does not return an error for invalid states + suite.NoError(err) // models.JobStateType.UnmarshalText() does not return an error for invalid states suite.Equal(models.JobStateTypeUndefined, state) } diff --git a/pkg/models/execution.go b/pkg/models/execution.go index ab54ebfee5..844fa26119 100644 --- a/pkg/models/execution.go +++ b/pkg/models/execution.go @@ -42,7 +42,7 @@ func (s ExecutionStateType) IsUndefined() bool { return s == ExecutionStateUndefined } -func (s ExecutionStateType) IsTermainl() bool { +func (s ExecutionStateType) IsTerminal() bool { return s == ExecutionStateBidRejected || s == ExecutionStateCompleted || s == ExecutionStateFailed || diff --git a/pkg/models/job_event_string.go b/pkg/models/job_event_string.go deleted file mode 100644 index 1d345bf277..0000000000 --- a/pkg/models/job_event_string.go +++ /dev/null @@ -1,36 +0,0 @@ -// Code generated by "stringer -type=JobEventType --trimprefix=JobEvent --output job_event_string.go"; DO NOT EDIT. - -package models - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[jobEventUndefined-0] - _ = x[JobEventCreated-1] - _ = x[JobEventBid-2] - _ = x[JobEventBidAccepted-3] - _ = x[JobEventBidRejected-4] - _ = x[JobEventComputeError-5] - _ = x[JobEventResultsProposed-6] - _ = x[JobEventResultsAccepted-7] - _ = x[JobEventResultsRejected-8] - _ = x[JobEventResultsPublished-9] - _ = x[JobEventError-10] - _ = x[JobEventCanceled-11] - _ = x[JobEventCompleted-12] - _ = x[jobEventDone-13] -} - -const _JobEventType_name = "jobEventUndefinedCreatedBidBidAcceptedBidRejectedComputeErrorResultsProposedResultsAcceptedResultsRejectedResultsPublishedErrorCanceledCompletedjobEventDone" - -var _JobEventType_index = [...]uint8{0, 17, 24, 27, 38, 49, 61, 76, 91, 106, 122, 127, 135, 144, 156} - -func (i JobEventType) String() string { - if i < 0 || i >= JobEventType(len(_JobEventType_index)-1) { - return "JobEventType(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _JobEventType_name[_JobEventType_index[i]:_JobEventType_index[i+1]] -} diff --git a/pkg/models/jobevent.go b/pkg/models/jobevent.go deleted file mode 100644 index 54de944a22..0000000000 --- a/pkg/models/jobevent.go +++ /dev/null @@ -1,107 +0,0 @@ -package models - -import ( - "fmt" - "time" -) - -//go:generate stringer -type=JobEventType --trimprefix=JobEvent --output job_event_string.go -type JobEventType int - -const ( - jobEventUndefined JobEventType = iota // must be first - - // Job has been created on the requester node - JobEventCreated - - // a compute node bid on a job - JobEventBid - - // a requester node accepted for rejected a job bid - JobEventBidAccepted - JobEventBidRejected - - // a compute node had an error running a job - JobEventComputeError - - // a compute node completed running a job - JobEventResultsProposed - - // a Requester node accepted the results from a node for a job - JobEventResultsAccepted - - // a Requester node rejected the results from a node for a job - JobEventResultsRejected - - // once the results have been accepted or rejected - // the compute node will publish them and issue this event - JobEventResultsPublished - - // a requester node declared an error running a job - JobEventError - - // a user canceled a job - JobEventCanceled - - // a job has been completed - JobEventCompleted - - jobEventDone // must be last -) - -func (je JobEventType) IsUndefined() bool { - return je == jobEventUndefined -} - -// IsTerminal returns true if the given event type signals the end of the -// lifecycle of a job. After this, all nodes can safely ignore the job. -func (je JobEventType) IsTerminal() bool { - return je == JobEventError || je == JobEventCompleted || je == JobEventCanceled -} - -func ParseJobEventType(str string) (JobEventType, error) { - for typ := jobEventUndefined + 1; typ < jobEventDone; typ++ { - if equal(typ.String(), str) { - return typ, nil - } - } - - return jobEventUndefined, fmt.Errorf( - "executor: unknown job event type '%s'", str) -} - -func JobEventTypes() []JobEventType { - var res []JobEventType - for typ := jobEventUndefined + 1; typ < jobEventDone; typ++ { - res = append(res, typ) - } - - return res -} - -func (je JobEventType) MarshalText() ([]byte, error) { - return []byte(je.String()), nil -} - -func (je *JobEventType) UnmarshalText(text []byte) (err error) { - name := string(text) - *je, err = ParseJobEventType(name) - return -} - -// TODO remove this https://github.com/bacalhau-project/bacalhau/issues/4185 -type JobEvent struct { - JobID string `json:"JobID,omitempty" example:"9304c616-291f-41ad-b862-54e133c0149e"` - // compute execution identifier - ExecutionID string `json:"ExecutionID,omitempty" example:"9304c616-291f-41ad-b862-54e133c0149e"` - // the node that emitted this event - SourceNodeID string `json:"SourceNodeID,omitempty" example:"QmXaXu9N5GNetatsvwnTfQqNtSeKAD6uCmarbh3LMRYAcF"` - // the node that this event is for - // e.g. "AcceptJobBid" was emitted by Requester but it targeting compute node - TargetNodeID string `json:"TargetNodeID,omitempty" example:"QmdZQ7ZbhnvWY1J12XYKGHApJ6aufKyLNSvf8jZBrBaAVL"` - - EventName JobEventType `json:"EventName,omitempty"` - Status string `json:"Status,omitempty" example:"Got results proposal of length: 0"` - - EventTime time.Time `json:"EventTime,omitempty" example:"2022-11-17T13:32:55.756658941Z"` -} diff --git a/pkg/models/network.go b/pkg/models/network.go index ed2d063ad2..080b0c28a3 100644 --- a/pkg/models/network.go +++ b/pkg/models/network.go @@ -118,7 +118,7 @@ func (n *NetworkConfig) Validate() (err error) { err = errors.Join(err, fmt.Errorf("invalid networking type %q", n.Type)) } - // TODO(forrest): should return an error if the network type is not HTTP and domanins are set. + // TODO(forrest): should return an error if the network type is not HTTP and domains are set. for _, domain := range n.Domains { if domainRegex.MatchString(domain) { continue @@ -210,28 +210,28 @@ func matchDomain(left, right string) (diff int) { return diff } - lcur, rcur := len(lefts)-1, len(rights)-1 - for lcur >= 0 && rcur >= 0 { + lCur, rCur := len(lefts)-1, len(rights)-1 + for lCur >= 0 && rCur >= 0 { // If neither is a blank, these components need to match. - if lefts[lcur] != wildcard && rights[rcur] != wildcard { - if diff = strings.Compare(lefts[lcur], rights[rcur]); diff != 0 { + if lefts[lCur] != wildcard && rights[rCur] != wildcard { + if diff = strings.Compare(lefts[lCur], rights[rCur]); diff != 0 { return diff } } // If both are blanks, they match. - if lefts[lcur] == wildcard || rights[rcur] == wildcard { + if lefts[lCur] == wildcard || rights[rCur] == wildcard { break } // Blank means we are matching any subdomains, so only the rest of // the domain needs to match for this to work. - if lefts[lcur] != wildcard { - lcur -= 1 + if lefts[lCur] != wildcard { + lCur -= 1 } - if rights[rcur] != wildcard { - rcur -= 1 + if rights[rCur] != wildcard { + rCur -= 1 } } diff --git a/pkg/models/network_test.go b/pkg/models/network_test.go index 7c3c5265b4..e64c927613 100644 --- a/pkg/models/network_test.go +++ b/pkg/models/network_test.go @@ -108,8 +108,8 @@ func TestDomainMatching(t *testing.T) { {require.Less, "zzz.com", "foo.com"}, {require.Greater, "aaa.com", "foo.com"}, {require.Equal, "FOO.com", "foo.COM"}, - {require.Less, "bfoo.com", "afoo.com"}, - {require.Greater, "afoo.com", "bfoo.com"}, + {require.Less, "bFoo.com", "aFoo.com"}, + {require.Greater, "aFoo.com", "bFoo.com"}, {require.Less, "x-foo.com", ".foo.com"}, } diff --git a/pkg/nats/proxy/compute_proxy.go b/pkg/nats/proxy/compute_proxy.go index 498eb94b13..97d2bebb32 100644 --- a/pkg/nats/proxy/compute_proxy.go +++ b/pkg/nats/proxy/compute_proxy.go @@ -6,12 +6,13 @@ import ( "fmt" "time" + "github.com/nats-io/nats.go" + "github.com/rs/zerolog/log" + "github.com/bacalhau-project/bacalhau/pkg/compute" "github.com/bacalhau-project/bacalhau/pkg/lib/concurrency" "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/nats/stream" - "github.com/nats-io/nats.go" - "github.com/rs/zerolog/log" ) const ( @@ -35,7 +36,7 @@ func NewComputeProxy(params ComputeProxyParams) (*ComputeProxy, error) { sc, err := stream.NewConsumerClient(stream.ConsumerClientParams{ Conn: params.Conn, Config: stream.StreamConsumerClientConfig{ - StreamCancellationBufferDuration: 5 * time.Second, //nolinter:gomnd + StreamCancellationBufferDuration: 5 * time.Second, //nolint:gomnd }, }) if err != nil { diff --git a/pkg/nats/stream/consumer_client.go b/pkg/nats/stream/consumer_client.go index f996763482..f4dea71f1e 100644 --- a/pkg/nats/stream/consumer_client.go +++ b/pkg/nats/stream/consumer_client.go @@ -9,11 +9,12 @@ import ( "sync" "time" - "github.com/bacalhau-project/bacalhau/pkg/lib/concurrency" "github.com/nats-io/nats.go" "github.com/nats-io/nuid" "github.com/rs/zerolog/log" "github.com/samber/lo" + + "github.com/bacalhau-project/bacalhau/pkg/lib/concurrency" ) // RequestChanLen Default request channel length for buffering asynchronous results. @@ -26,7 +27,7 @@ const ( heartBeatPrefix = "_HEARTBEAT" inboxPrefixLen = len(inboxPrefix) replySuffixLen = 8 // Gives us 62^8 - rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + rDigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" base = 62 nuidSize = 22 ) @@ -219,7 +220,7 @@ func (nc *ConsumerClient) newRespInbox() string { rn := nc.respRand.Int63() for i := 0; i < replySuffixLen; i++ { - sb.WriteByte(rdigits[rn%base]) + sb.WriteByte(rDigits[rn%base]) rn /= base } diff --git a/pkg/nats/stream/types.go b/pkg/nats/stream/types.go index d3cc84ca3b..f6bf8be717 100644 --- a/pkg/nats/stream/types.go +++ b/pkg/nats/stream/types.go @@ -49,7 +49,7 @@ type StreamInfo struct { // CreatedAt represents the time the stream was created. CreatedAt time.Time // Function to cancel the stream. This is useful in the event the consumer client - // is no longer interested in the stream. The cancel function is inovked informing the + // is no longer interested in the stream. The cancel function is invoked informing the // producer to no longer serve the stream. Cancel context.CancelFunc } diff --git a/pkg/nats/transport/nats.go b/pkg/nats/transport/nats.go index 5c1bff4c8f..e96c5bac86 100644 --- a/pkg/nats/transport/nats.go +++ b/pkg/nats/transport/nats.go @@ -264,7 +264,7 @@ func (t *NATSTransport) CallbackProxy() compute.Callback { return t.callbackProxy } -// RegistrationProxy returns the previoously created registration proxy. +// ManagementProxy returns the previously created registration proxy. func (t *NATSTransport) ManagementProxy() compute.ManagementEndpoint { return t.managementProxy } diff --git a/pkg/node/factories.go b/pkg/node/factories.go index 9c869a8e0b..d10fa144b1 100644 --- a/pkg/node/factories.go +++ b/pkg/node/factories.go @@ -96,7 +96,7 @@ func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) Authenticators func(ctx context.Context, nodeConfig NodeConfig) (authn.Provider, error) { var allErr error - authns := make(map[string]authn.Authenticator, len(nodeConfig.BacalhauConfig.API.Auth.Methods)) + auths := make(map[string]authn.Authenticator, len(nodeConfig.BacalhauConfig.API.Auth.Methods)) for name, authnConfig := range nodeConfig.BacalhauConfig.API.Auth.Methods { switch authnConfig.Type { case string(authn.MethodTypeChallenge): @@ -106,7 +106,7 @@ func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) Authenticators continue } - authns[name] = challenge.NewAuthenticator( + auths[name] = challenge.NewAuthenticator( methodPolicy, challenge.NewStringMarshaller(nodeConfig.NodeID), userKey.PrivateKey(), @@ -119,7 +119,7 @@ func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) Authenticators continue } - authns[name] = ask.NewAuthenticator( + auths[name] = ask.NewAuthenticator( methodPolicy, userKey.PrivateKey(), nodeConfig.NodeID, @@ -129,7 +129,7 @@ func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) Authenticators } } - return provider.NewMappedProvider(authns), allErr + return provider.NewMappedProvider(auths), allErr }, ) } diff --git a/pkg/node/requester.go b/pkg/node/requester.go index ee73b00b9c..96166b5285 100644 --- a/pkg/node/requester.go +++ b/pkg/node/requester.go @@ -36,7 +36,6 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/util" "github.com/bacalhau-project/bacalhau/pkg/compute" - "github.com/bacalhau-project/bacalhau/pkg/eventhandler" "github.com/bacalhau-project/bacalhau/pkg/jobstore" "github.com/bacalhau-project/bacalhau/pkg/orchestrator/selection/discovery" "github.com/bacalhau-project/bacalhau/pkg/orchestrator/selection/ranking" @@ -76,14 +75,6 @@ func NewRequesterNode( return nil, err } - // prepare event handlers - tracerContextProvider := eventhandler.NewTracerContextProvider(nodeID) - localJobEventConsumer := eventhandler.NewChainedJobEventHandler(tracerContextProvider) - - eventEmitter := orchestrator.NewEventEmitter(orchestrator.EventEmitterParams{ - EventConsumer: localJobEventConsumer, - }) - jobStore, err := createJobStore(ctx, cfg) if err != nil { return nil, err @@ -120,12 +111,6 @@ func NewRequesterNode( JobStore: jobStore, }), - // planner that publishes events on job completion or failure - planner.NewEventEmitter(planner.EventEmitterParams{ - ID: nodeID, - EventEmitter: eventEmitter, - }), - // logs job completion or failure planner.NewLoggingPlanner(), ) @@ -227,7 +212,6 @@ func NewRequesterNode( endpointV2 := orchestrator.NewBaseEndpoint(&orchestrator.BaseEndpointParams{ ID: nodeID, Store: jobStore, - EventEmitter: eventEmitter, ComputeProxy: computeProxy, JobTransformer: jobTransformers, TaskTranslator: translationProvider, @@ -268,12 +252,6 @@ func NewRequesterNode( ) auth_endpoint.BindEndpoint(ctx, apiServer.Router, authenticators) - // order of event handlers is important as triggering some handlers might depend on the state of others. - localJobEventConsumer.AddHandlers( - // ends the span for the job if received a terminal event - tracerContextProvider, - ) - // ncl subscriber, err := ncl.NewSubscriber(transportLayer.Client(), ncl.WithSubscriberMessageSerDeRegistry(messageSerDeRegistry), @@ -302,10 +280,6 @@ func NewRequesterNode( } evalBroker.SetEnabled(false) - cleanupErr = tracerContextProvider.Shutdown() - if cleanupErr != nil { - util.LogDebugIfContextCancelled(ctx, cleanupErr, "failed to shutdown tracer context provider") - } // Close the jobstore after the evaluation broker is disabled cleanupErr = jobStore.Close(ctx) if cleanupErr != nil { @@ -317,9 +291,8 @@ func NewRequesterNode( // It provides the compute call back endpoints for interacting with compute nodes. // e.g. bidding, job completions, cancellations, and failures callback := orchestrator.NewCallback(&orchestrator.CallbackParams{ - ID: nodeID, - EventEmitter: eventEmitter, - Store: jobStore, + ID: nodeID, + Store: jobStore, }) if err = transportLayer.RegisterComputeCallback(callback); err != nil { return nil, err diff --git a/pkg/node/utils.go b/pkg/node/utils.go index 69ea4218f0..28673152d7 100644 --- a/pkg/node/utils.go +++ b/pkg/node/utils.go @@ -56,7 +56,7 @@ func getTLSCertificate(cfg types.Bacalhau) (string, string, error) { return "", "", err } else if caCert, err := crypto.NewSelfSignedCertificate(privKey, false, ips); err != nil { return "", "", errors.Wrap(err, "failed to generate server certificate") - } else if err = caCert.MarshalCertficate(certFile); err != nil { + } else if err = caCert.MarshalCertificate(certFile); err != nil { return "", "", errors.Wrap(err, "failed to write server certificate") } cert = certFile.Name() diff --git a/pkg/orchestrator/callback.go b/pkg/orchestrator/callback.go index 4550bf8369..6be49b86b4 100644 --- a/pkg/orchestrator/callback.go +++ b/pkg/orchestrator/callback.go @@ -13,23 +13,20 @@ import ( ) type CallbackParams struct { - ID string - Store jobstore.Store - EventEmitter EventEmitter + ID string + Store jobstore.Store } // Callback base implementation of requester Endpoint type Callback struct { - id string - store jobstore.Store - eventEmitter EventEmitter + id string + store jobstore.Store } func NewCallback(params *CallbackParams) *Callback { return &Callback{ - id: params.ID, - store: params.Store, - eventEmitter: params.EventEmitter, + id: params.ID, + store: params.Store, } } @@ -96,16 +93,11 @@ func (e *Callback) OnBidComplete(ctx context.Context, response compute.BidResult log.Ctx(ctx).Error().Err(err).Msgf("[OnBidComplete] failed to commit transaction") return } - - if response.Accepted { - e.eventEmitter.EmitBidReceived(ctx, response) - } } func (e *Callback) OnRunComplete(ctx context.Context, result compute.RunResult) { log.Ctx(ctx).Debug().Msgf("Requester node %s received RunComplete for execution: %s from %s", e.id, result.ExecutionID, result.SourcePeerID) - e.eventEmitter.EmitRunComplete(ctx, result) txContext, err := e.store.BeginTx(ctx) if err != nil { @@ -223,8 +215,6 @@ func (e *Callback) OnComputeFailure(ctx context.Context, result compute.ComputeE log.Ctx(ctx).Error().Err(err).Msgf("[OnComputeFailure] failed to commit transaction") return } - - e.eventEmitter.EmitComputeFailure(ctx, result.ExecutionID, result) } // enqueueEvaluation enqueues an evaluation to allow the scheduler to either accept the bid, or find a new node diff --git a/pkg/orchestrator/endpoint.go b/pkg/orchestrator/endpoint.go index dfe1a5da90..caf28c0d78 100644 --- a/pkg/orchestrator/endpoint.go +++ b/pkg/orchestrator/endpoint.go @@ -24,7 +24,6 @@ import ( type BaseEndpointParams struct { ID string Store jobstore.Store - EventEmitter EventEmitter ComputeProxy compute.Endpoint JobTransformer transformer.JobTransformer TaskTranslator translation.TranslatorProvider @@ -34,7 +33,6 @@ type BaseEndpointParams struct { type BaseEndpoint struct { id string store jobstore.Store - eventEmitter EventEmitter computeProxy compute.Endpoint jobTransformer transformer.JobTransformer taskTranslator translation.TranslatorProvider @@ -45,7 +43,6 @@ func NewBaseEndpoint(params *BaseEndpointParams) *BaseEndpoint { return &BaseEndpoint{ id: params.ID, store: params.Store, - eventEmitter: params.EventEmitter, computeProxy: params.ComputeProxy, jobTransformer: params.JobTransformer, taskTranslator: params.TaskTranslator, @@ -145,7 +142,6 @@ func (e *BaseEndpoint) SubmitJob(ctx context.Context, request *SubmitJobRequest) return nil, err } - e.eventEmitter.EmitJobCreated(ctx, *job) return &SubmitJobResponse{ JobID: job.ID, EvaluationID: eval.ID, @@ -223,12 +219,6 @@ func (e *BaseEndpoint) StopJob(ctx context.Context, request *StopJobRequest) (St return StopJobResponse{}, err } - e.eventEmitter.EmitEventSilently(ctx, models.JobEvent{ - JobID: request.JobID, - EventName: models.JobEventCanceled, - Status: request.Reason, - EventTime: time.Now(), - }) return StopJobResponse{ EvaluationID: evalID, }, nil diff --git a/pkg/orchestrator/event_emitter.go b/pkg/orchestrator/event_emitter.go deleted file mode 100644 index 513caa25f5..0000000000 --- a/pkg/orchestrator/event_emitter.go +++ /dev/null @@ -1,96 +0,0 @@ -package orchestrator - -import ( - "context" - "time" - - "github.com/rs/zerolog/log" - - "github.com/bacalhau-project/bacalhau/pkg/compute" - "github.com/bacalhau-project/bacalhau/pkg/eventhandler" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// A quick workaround to publish job events locally as we still have some types that rely -// on job events to update their states (e.g. localdb) and to take actions (e.g. websockets and logging) -// TODO: create a strongly typed local event emitter, and update localdb directly from -// -// requester instead of consuming events. -type EventEmitterParams struct { - EventConsumer eventhandler.JobEventHandler -} - -type EventEmitter struct { - eventConsumer eventhandler.JobEventHandler -} - -func NewEventEmitter(params EventEmitterParams) EventEmitter { - return EventEmitter{ - eventConsumer: params.EventConsumer, - } -} - -func (e EventEmitter) EmitJobCreated( - ctx context.Context, job models.Job) { - event := models.JobEvent{ - JobID: job.ID, - SourceNodeID: job.Meta[models.MetaRequesterID], - EventName: models.JobEventCreated, - EventTime: time.Now(), - } - e.EmitEventSilently(ctx, event) -} - -func (e EventEmitter) EmitBidReceived( - ctx context.Context, result compute.BidResult) { - e.EmitEventSilently(ctx, e.constructEvent(result.RoutingMetadata, result.ExecutionMetadata, models.JobEventBid)) -} - -func (e EventEmitter) EmitBidAccepted( - ctx context.Context, request compute.BidAcceptedRequest, response compute.BidAcceptedResponse) { - e.EmitEventSilently(ctx, e.constructEvent(request.RoutingMetadata, response.ExecutionMetadata, models.JobEventBidAccepted)) -} - -func (e EventEmitter) EmitBidRejected( - ctx context.Context, request compute.BidRejectedRequest, response compute.BidRejectedResponse) { - e.EmitEventSilently(ctx, e.constructEvent(request.RoutingMetadata, response.ExecutionMetadata, models.JobEventBidRejected)) -} - -func (e EventEmitter) EmitRunComplete(ctx context.Context, response compute.RunResult) { - e.EmitEventSilently(ctx, e.constructEvent(response.RoutingMetadata, response.ExecutionMetadata, models.JobEventResultsProposed)) -} - -func (e EventEmitter) EmitComputeFailure(ctx context.Context, executionID string, err error) { - event := models.JobEvent{ - ExecutionID: executionID, - EventName: models.JobEventComputeError, - Status: err.Error(), - EventTime: time.Now(), - } - e.EmitEventSilently(ctx, event) -} - -func (e EventEmitter) constructEvent( - routingMetadata compute.RoutingMetadata, - executionMetadata compute.ExecutionMetadata, - eventName models.JobEventType) models.JobEvent { - return models.JobEvent{ - TargetNodeID: routingMetadata.TargetPeerID, - SourceNodeID: routingMetadata.SourcePeerID, - JobID: executionMetadata.JobID, - ExecutionID: executionMetadata.ExecutionID, - EventName: eventName, - EventTime: time.Now(), - } -} - -func (e EventEmitter) EmitEvent(ctx context.Context, event models.JobEvent) error { - return e.eventConsumer.HandleJobEvent(ctx, event) -} - -func (e EventEmitter) EmitEventSilently(ctx context.Context, event models.JobEvent) { - err := e.EmitEvent(ctx, event) - if err != nil { - log.Ctx(ctx).Error().Err(err).Msgf("failed to emit event %+v", event) - } -} diff --git a/pkg/orchestrator/planner/event_emitter.go b/pkg/orchestrator/planner/event_emitter.go deleted file mode 100644 index e7ea6bea28..0000000000 --- a/pkg/orchestrator/planner/event_emitter.go +++ /dev/null @@ -1,54 +0,0 @@ -package planner - -import ( - "context" - "time" - - "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/orchestrator" -) - -// EventEmitter is a planner implementation that emits events based on the job state. -type EventEmitter struct { - id string - eventEmitter orchestrator.EventEmitter -} - -// EventEmitterParams holds the parameters for creating a new EventEmitter. -type EventEmitterParams struct { - ID string - EventEmitter orchestrator.EventEmitter -} - -// NewEventEmitter creates a new instance of EventEmitter. -func NewEventEmitter(params EventEmitterParams) *EventEmitter { - return &EventEmitter{ - id: params.ID, - eventEmitter: params.EventEmitter, - } -} - -// Process updates the state of the executions in the plan according to the scheduler's desired state. -func (s *EventEmitter) Process(ctx context.Context, plan *models.Plan) error { - var eventName models.JobEventType - switch plan.DesiredJobState { - case models.JobStateTypeCompleted: - eventName = models.JobEventCompleted - case models.JobStateTypeFailed: - eventName = models.JobEventError - default: - } - if !eventName.IsUndefined() { - s.eventEmitter.EmitEventSilently(ctx, models.JobEvent{ - SourceNodeID: s.id, - JobID: plan.Job.ID, - Status: plan.UpdateMessage, - EventName: eventName, - EventTime: time.Now(), - }) - } - return nil -} - -// compile-time check whether the EventEmitter implements the Planner interface. -var _ orchestrator.Planner = (*EventEmitter)(nil) diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go index 6d2bc98bfd..81845fdad9 100644 --- a/pkg/publicapi/apimodels/error.go +++ b/pkg/publicapi/apimodels/error.go @@ -2,7 +2,6 @@ package apimodels import ( "encoding/json" - "errors" "fmt" "io" "net/http" @@ -66,24 +65,30 @@ func (e *APIError) Error() string { return e.Message } -// Parse HTTP Resposne to APIError -func FromHttpResponse(resp *http.Response) (*APIError, error) { - +// Parse HTTP Response to APIError +func GenerateAPIErrorFromHTTPResponse(resp *http.Response) *APIError { if resp == nil { - return nil, errors.New("response is nil, cannot be unmarsheld to APIError") + return NewAPIError(0, "API call error, invalid response") } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("error reading response body: %w", err) + return NewAPIError( + resp.StatusCode, + fmt.Sprintf("Unable to read API call response body. Error: %q", err.Error())) } var apiErr APIError err = json.Unmarshal(body, &apiErr) if err != nil { - return nil, fmt.Errorf("error parsing response body: %w", err) + return NewAPIError( + resp.StatusCode, + fmt.Sprintf("Unable to parse API call response body. Error: %q. Body received: %q", + err.Error(), + string(body), + )) } // If the JSON didn't include a status code, use the HTTP Status @@ -91,7 +96,7 @@ func FromHttpResponse(resp *http.Response) (*APIError, error) { apiErr.HTTPStatusCode = resp.StatusCode } - return &apiErr, nil + return &apiErr } // FromBacError converts a bacerror.Error to an APIError diff --git a/pkg/publicapi/client/v2/client.go b/pkg/publicapi/client/v2/client.go index 18f687ed34..95979028be 100644 --- a/pkg/publicapi/client/v2/client.go +++ b/pkg/publicapi/client/v2/client.go @@ -74,18 +74,12 @@ func (c *httpClient) Get(ctx context.Context, endpoint string, in apimodels.GetR return apimodels.NewUnauthorizedError("invalid token") } - var apiError *apimodels.APIError if resp.StatusCode != http.StatusOK { - apiError, err = apimodels.FromHttpResponse(resp) - if err != nil { - return err + if apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp); apiError != nil { + return apiError } } - if apiError != nil { - return apiError - } - defer resp.Body.Close() if out != nil { @@ -116,18 +110,12 @@ func (c *httpClient) write(ctx context.Context, verb, endpoint string, in apimod return apimodels.ErrInvalidToken } - var apiError *apimodels.APIError if resp.StatusCode != http.StatusOK { - apiError, err = apimodels.FromHttpResponse(resp) - if err != nil { - return err + if apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp); apiError != nil { + return apiError } } - if apiError != nil { - return apiError - } - if out != nil { if err := decodeBody(resp, &out); err != nil { return err @@ -362,12 +350,13 @@ func (c *httpClient) interceptError(ctx context.Context, err error, resp *http.R WithCode(bacerrors.UnauthorizedError) } - apiError, apiErr := apimodels.FromHttpResponse(resp) - if apiErr == nil { + apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp) + if apiError != nil { return apiError.ToBacError() } - return bacerrors.Wrap(apiErr, "server error"). + return bacerrors.New("server error"). + WithHTTPStatusCode(http.StatusInternalServerError). WithCode(bacerrors.InternalError) } diff --git a/pkg/publicapi/endpoint/orchestrator/node.go b/pkg/publicapi/endpoint/orchestrator/node.go index 18a60baa6b..890b4aa135 100644 --- a/pkg/publicapi/endpoint/orchestrator/node.go +++ b/pkg/publicapi/endpoint/orchestrator/node.go @@ -14,7 +14,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/util" ) -// godoc for Orchstrator GetNode +// godoc for Orchestrator GetNode // // @ID orchestrator/getNode // @Summary Get an orchestrator node diff --git a/pkg/publicapi/middleware/version.go b/pkg/publicapi/middleware/version.go index bda88e0ad2..292e98c6d2 100644 --- a/pkg/publicapi/middleware/version.go +++ b/pkg/publicapi/middleware/version.go @@ -34,32 +34,32 @@ func VersionNotifyLogger(logger *zerolog.Logger, serverVersion semver.Version) e // instructs logger to extract given list of headers from request. LogHeaders: []string{apimodels.HTTPHeaderBacalhauGitVersion}, LogValuesFunc: func(c echo.Context, v echomiddelware.RequestLoggerValues) error { - notif := Notification{ + notification := Notification{ RequestID: v.RequestID, ClientID: c.Response().Header().Get(apimodels.HTTPHeaderClientID), ServerVersion: serverVersion.String(), } defer func() { - if notif.Message != "" { + if notification.Message != "" { logger.WithLevel(zerolog.DebugLevel). - Str("ClientID", notif.ClientID). - Str("RequestID", notif.RequestID). - Str("ClientVersion", notif.ClientVersion). - Str("ServerVersion", notif.ServerVersion). - Msg(notif.Message) + Str("ClientID", notification.ClientID). + Str("RequestID", notification.RequestID). + Str("ClientVersion", notification.ClientVersion). + Str("ServerVersion", notification.ServerVersion). + Msg(notification.Message) } }() cVersion := v.Headers[apimodels.HTTPHeaderBacalhauGitVersion] if len(cVersion) == 0 { // version header is empty, cannot parse it - notif.Message = "received request from client without version" + notification.Message = "received request from client without version" return nil } if len(cVersion) > 1 { // version header contained multiple fields - notif.Message = fmt.Sprintf("received request from client with multiple versions: %s", cVersion) + notification.Message = fmt.Sprintf("received request from client with multiple versions: %s", cVersion) return nil } @@ -67,20 +67,20 @@ func VersionNotifyLogger(logger *zerolog.Logger, serverVersion semver.Version) e clientVersion, err := semver.NewVersion(cVersion[0]) if err != nil { // cannot parse client version, should notify - notif.Message = fmt.Sprintf("received request with invalid client version: %s", cVersion[0]) + notification.Message = fmt.Sprintf("received request with invalid client version: %s", cVersion[0]) return nil } // extract parsed client version for comparison - notif.ClientVersion = clientVersion.String() + notification.ClientVersion = clientVersion.String() diff := serverVersion.Compare(clientVersion) switch diff { case 1: // client version is less than server version - notif.Message = "received request from outdated client" + notification.Message = "received request from outdated client" case -1: // server version is less than client version - notif.Message = "received request from newer client" + notification.Message = "received request from newer client" case 0: // versions are the same, don't notify } diff --git a/pkg/publicapi/middleware/version_test.go b/pkg/publicapi/middleware/version_test.go index f49b1a2d9c..1887b3600a 100644 --- a/pkg/publicapi/middleware/version_test.go +++ b/pkg/publicapi/middleware/version_test.go @@ -103,9 +103,9 @@ func (suite *VersionNotifyTestSuite) TestLogVersionNotify() { if suite.buf.Len() == 0 { suite.Equalf("", tc.expectedMessage, "unexpected notification") } else { - notif := suite.parseMessage(suite.buf.String()) - suite.Contains(notif.Message, tc.expectedMessage) - suite.Equal(tc.expectedClientVersion, notif.ClientVersion) + notification := suite.parseMessage(suite.buf.String()) + suite.Contains(notification.Message, tc.expectedMessage) + suite.Equal(tc.expectedClientVersion, notification.ClientVersion) } }) } diff --git a/pkg/repo/migrations/v3_4.go b/pkg/repo/migrations/v3_4.go index 939499a914..23bd35065b 100644 --- a/pkg/repo/migrations/v3_4.go +++ b/pkg/repo/migrations/v3_4.go @@ -57,7 +57,7 @@ func V3MigrationWithConfig(globalCfg system.GlobalConfig) repo.Migration { } // update the legacy version file so older versions fail gracefully. if err := r.WriteLegacyVersion(repo.Version4); err != nil { - return fmt.Errorf("updating repo.verion: %w", err) + return fmt.Errorf("updating repo.version: %w", err) } if err := r.WriteLastUpdateCheck(time.UnixMilli(0)); err != nil { return err diff --git a/pkg/routing/tracing/tracing.go b/pkg/routing/tracing/tracing.go index 1efadffe62..ed936e15c0 100644 --- a/pkg/routing/tracing/tracing.go +++ b/pkg/routing/tracing/tracing.go @@ -62,7 +62,7 @@ func (r *NodeStore) GetByPrefix(ctx context.Context, prefix string) (models.Node log.Ctx(ctx).Trace(). Dur("duration", dur). Str("prefix", prefix). - Msg("node retrieved by previus") + Msg("node retrieved by previous") }() return r.delegate.GetByPrefix(ctx, prefix) diff --git a/pkg/s3/errors_test.go b/pkg/s3/errors_test.go index 3e72863698..096da13e93 100644 --- a/pkg/s3/errors_test.go +++ b/pkg/s3/errors_test.go @@ -1,5 +1,7 @@ //go:build unit || !integration +/* spell-checker: disable */ + package s3 import ( diff --git a/pkg/storage/inline/storage.go b/pkg/storage/inline/storage.go index 927110ae63..75ca770ff5 100644 --- a/pkg/storage/inline/storage.go +++ b/pkg/storage/inline/storage.go @@ -33,11 +33,12 @@ import ( "os" "path/filepath" + "github.com/c2h5oh/datasize" + "github.com/vincent-petithory/dataurl" + "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/storage" "github.com/bacalhau-project/bacalhau/pkg/util/targzip" - "github.com/c2h5oh/datasize" - "github.com/vincent-petithory/dataurl" ) // The maximum size that will be stored inline without gzip compression. @@ -74,8 +75,8 @@ func (i *InlineStorage) GetVolumeSize(_ context.Context, spec models.InputSource } if data.ContentType() == gzipMimeType { - size, derr := targzip.UncompressedSize(bytes.NewReader(data.Data)) - return size.Bytes(), derr + size, dErr := targzip.UncompressedSize(bytes.NewReader(data.Data)) + return size.Bytes(), dErr } else { return uint64(len(data.Data)), nil } @@ -128,13 +129,13 @@ func (i *InlineStorage) PrepareStorage(_ context.Context, storageDirectory strin return storage.StorageVolume{}, err } - _, werr := tempfile.Write(data.Data) - cerr := tempfile.Close() + _, wErr := tempfile.Write(data.Data) + cErr := tempfile.Close() return storage.StorageVolume{ Type: storage.StorageVolumeConnectorBind, Source: tempfile.Name(), Target: spec.Target, - }, errors.Join(werr, cerr) + }, errors.Join(wErr, cErr) } } diff --git a/pkg/storage/s3/storage_test.go b/pkg/storage/s3/storage_test.go index 5016a01ae9..2aa3519edc 100644 --- a/pkg/storage/s3/storage_test.go +++ b/pkg/storage/s3/storage_test.go @@ -1,5 +1,7 @@ //go:build integration || !unit +/* spell-checker: disable */ + package s3_test import ( diff --git a/pkg/test/compute/resourcelimits_test.go b/pkg/test/compute/resourcelimits_test.go index 43b510904c..f4635b271d 100644 --- a/pkg/test/compute/resourcelimits_test.go +++ b/pkg/test/compute/resourcelimits_test.go @@ -162,7 +162,7 @@ func (suite *ComputeNodeResourceLimitsSuite) TestTotalResourceLimits() { }) require.NoError(suite.T(), err) - // sleep a bit here to simulate jobs being sumbmitted over time + // sleep a bit here to simulate jobs being submitted over time time.Sleep((10 + time.Duration(rand.Intn(10))) * time.Millisecond) } @@ -363,7 +363,7 @@ func (suite *ComputeNodeResourceLimitsSuite) TestParallelGPU() { require.NoError(suite.T(), err) jobIds = append(jobIds, submittedJob.JobID) - // sleep a bit here to simulate jobs being sumbmitted over time + // sleep a bit here to simulate jobs being submitted over time // and to give time for compute nodes to accept and run the jobs // this needs to be less than the time the job lasts // so we are running jobs in parallel diff --git a/pkg/test/executor/test_runner.go b/pkg/test/executor/test_runner.go index a9a24b7dca..acd6bb37ee 100644 --- a/pkg/test/executor/test_runner.go +++ b/pkg/test/executor/test_runner.go @@ -87,9 +87,9 @@ func RunTestCase( execution.AllocateResources(job.Task().Name, models.Resources{}) resultsDirectory := t.TempDir() - strgProvider := stack.Nodes[0].ComputeNode.Storages + storageProvider := stack.Nodes[0].ComputeNode.Storages - runCommandArguments, cleanup, err := compute.PrepareRunArguments(ctx, strgProvider, t.TempDir(), execution, resultsDirectory) + runCommandArguments, cleanup, err := compute.PrepareRunArguments(ctx, storageProvider, t.TempDir(), execution, resultsDirectory) require.NoError(t, err) t.Cleanup(func() { if err := cleanup(ctx); err != nil { diff --git a/pkg/test/scenario/resolver.go b/pkg/test/scenario/resolver.go index 9fa1dad27d..0ca4dc75d4 100644 --- a/pkg/test/scenario/resolver.go +++ b/pkg/test/scenario/resolver.go @@ -183,7 +183,7 @@ func GetFilteredExecutionStates(jobState *JobState, filterState models.Execution func WaitForTerminalStates() StateChecks { return func(state *JobState) (bool, error) { for _, executionState := range state.Executions { - if !executionState.ComputeState.StateType.IsTermainl() { + if !executionState.ComputeState.StateType.IsTerminal() { return false, nil } } diff --git a/pkg/test/scenario/responses.go b/pkg/test/scenario/responses.go index b78d2f8eb8..217cb11d12 100644 --- a/pkg/test/scenario/responses.go +++ b/pkg/test/scenario/responses.go @@ -21,7 +21,7 @@ func SubmitJobSuccess() CheckSubmitResponse { return fmt.Errorf("expected job response, got nil") } if len(response.Warnings) > 0 { - return fmt.Errorf("unexpted warnings returned when submitting job: %v", response.Warnings) + return fmt.Errorf("unexpected warnings returned when submitting job: %v", response.Warnings) } return nil } diff --git a/pkg/test/scenario/results.go b/pkg/test/scenario/results.go index 7d2825e8b2..e62ee66428 100644 --- a/pkg/test/scenario/results.go +++ b/pkg/test/scenario/results.go @@ -64,7 +64,7 @@ func FileEquals( } } -// ManyCheckes returns a CheckResults that runs the passed checkers and returns +// ManyChecks returns a CheckResults that runs the passed checkers and returns // an error if any of them fail. func ManyChecks(checks ...CheckResults) CheckResults { return func(resultsDir string) error { diff --git a/pkg/util/generic/broadcaster_test.go b/pkg/util/generic/broadcaster_test.go index 0f4debc615..ca10bf067d 100644 --- a/pkg/util/generic/broadcaster_test.go +++ b/pkg/util/generic/broadcaster_test.go @@ -5,10 +5,11 @@ package generic_test import ( "testing" - _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/util/generic" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + + _ "github.com/bacalhau-project/bacalhau/pkg/logger" + "github.com/bacalhau-project/bacalhau/pkg/util/generic" ) type BroadcasterTestSuite struct { @@ -50,7 +51,7 @@ func (s *BroadcasterTestSuite) TestBroadcasterAutoclose() { require.Error(s.T(), err) } -func (s *BroadcasterTestSuite) TestBroadcasterSubUnsub() { +func (s *BroadcasterTestSuite) TestBroadcasterSubUnsubscribe() { ch1, err1 := s.broadcaster.Subscribe() ch2, err2 := s.broadcaster.Subscribe() require.NoError(s.T(), err1) diff --git a/pkg/util/idgen/short_id_test.go b/pkg/util/idgen/short_id_test.go index 36079088cc..230d08471b 100644 --- a/pkg/util/idgen/short_id_test.go +++ b/pkg/util/idgen/short_id_test.go @@ -1,5 +1,7 @@ //go:build unit || !integration +/* spell-checker: disable */ + package idgen import ( diff --git a/python/mkdocs.yml b/python/mkdocs.yml index 0b6a73d46a..3710c19320 100644 --- a/python/mkdocs.yml +++ b/python/mkdocs.yml @@ -1,4 +1,4 @@ -site_name: Bacalahu SDK +site_name: Bacalhau SDK site_url: https://github.com/bacalhau-project/bacalhau repo_url: https://github.com/bacalhau-project/bacalhau/python repo_name: bacalhau-project/bacalhau-sdk diff --git a/test-integration/Dockerfile-ClientNode b/test-integration/Dockerfile-ClientNode new file mode 100644 index 0000000000..da31f340f7 --- /dev/null +++ b/test-integration/Dockerfile-ClientNode @@ -0,0 +1,27 @@ +# Use the docker:dind image as the base image +FROM docker:dind + +# Set the working directory +WORKDIR /app + +# Install curl and bash +RUN apk update && apk add --no-cache curl bash + +# Install the ca-certificates package +RUN apk add --no-cache ca-certificates + +# Copy a root ca into the image +COPY certificates/generated_assets/bacalhau_test_root_ca.crt /usr/local/share/ca-certificates/bacalhau_test_root_ca.crt + +# Update CA certificates +RUN update-ca-certificates + +# Download and execute the Bash script from the given URL +RUN curl -sSL https://get.bacalhau.org/install.sh | bash + +# Download the binary, make it executable, and move it to /usr/local/bin +RUN curl -o /tmp/mc https://dl.min.io/client/mc/release/linux-amd64/mc \ + && chmod +x /tmp/mc \ + && mv /tmp/mc /usr/local/bin/ + +ENTRYPOINT ["dockerd-entrypoint.sh"] diff --git a/test-integration/Dockerfile-ComputeNode b/test-integration/Dockerfile-ComputeNode new file mode 100644 index 0000000000..7a6cc4ebaa --- /dev/null +++ b/test-integration/Dockerfile-ComputeNode @@ -0,0 +1,24 @@ +# Use the docker:dind image as the base image +FROM docker:dind + +# Set the working directory +WORKDIR /app + +# Install curl and bash +RUN apk update && apk add --no-cache curl bash + +# Install the ca-certificates package +RUN apk add --no-cache ca-certificates + +# Copy a root ca into the image +COPY certificates/generated_assets/bacalhau_test_root_ca.crt /usr/local/share/ca-certificates/bacalhau_test_root_ca.crt + +# Update CA certificates +RUN update-ca-certificates + +# Download and execute the Bash script from the given URL +RUN curl -sSL https://get.bacalhau.org/install.sh | bash + +COPY compute_node_image_setup.sh compute_node_image_setup.sh +ENTRYPOINT ["/usr/bin/env"] +CMD ./compute_node_image_setup.sh diff --git a/test-integration/Dockerfile-DockerImageRegistryNode b/test-integration/Dockerfile-DockerImageRegistryNode new file mode 100644 index 0000000000..9c38ba886e --- /dev/null +++ b/test-integration/Dockerfile-DockerImageRegistryNode @@ -0,0 +1,24 @@ +FROM registry:2 + +# Install curl and bash +RUN apk update && apk add --no-cache curl bash + +# Install the ca-certificates package +RUN apk add --no-cache ca-certificates + +# Copy a root ca into the image +COPY certificates/generated_assets/bacalhau_test_root_ca.crt /usr/local/share/ca-certificates/bacalhau_test_root_ca.crt + +# Create a directory to store certificates to be used by the registry +RUN mkdir /certs + +# Copy the certificate and key from the local directory to /certs +COPY certificates/generated_assets/bacalhau-container-img-registry-node.crt /certs/ +COPY certificates/generated_assets/bacalhau-container-img-registry-node.key /certs/ + +# Ensure proper permissions for certs +RUN chmod 600 /certs/bacalhau-container-img-registry-node.key +RUN chmod 644 /certs/bacalhau-container-img-registry-node.crt + +# Expose the registry's default port +EXPOSE 5000 443 diff --git a/test-integration/Dockerfile-RequesterNode b/test-integration/Dockerfile-RequesterNode new file mode 100644 index 0000000000..cbbd207c32 --- /dev/null +++ b/test-integration/Dockerfile-RequesterNode @@ -0,0 +1,22 @@ +# Use the docker:dind image as the base image +FROM docker:dind + +# Set the working directory +WORKDIR /app + +# Install curl and bash +RUN apk update && apk add --no-cache curl bash + +# Install the ca-certificates package +RUN apk add --no-cache ca-certificates + +# Copy a root ca into the image +COPY certificates/generated_assets/bacalhau_test_root_ca.crt /usr/local/share/ca-certificates/bacalhau_test_root_ca.crt + +# Update CA certificates +RUN update-ca-certificates + +# Download and execute the Bash script from the given URL +RUN curl -sSL https://get.bacalhau.org/install.sh | bash + +ENTRYPOINT ["dockerd-entrypoint.sh"] diff --git a/test-integration/README.md b/test-integration/README.md new file mode 100644 index 0000000000..48f9eabed4 --- /dev/null +++ b/test-integration/README.md @@ -0,0 +1,198 @@ +# Running Bacalhau on Docker + +## Overview + +Since Bacalhau is a distributed system with multiple components, it is critical to have a reliable method for end-to-end testing. Additionally, it's important that these tests closely resemble a real production environment without relying on mocks. + +This setup addresses those needs by running Bacalhau inside containers while also supporting Docker workloads within these containers (using Docker-in-Docker, or DinD). + +## Architecture + +- A Requester Docker container, running Bacalhau as a requester node. +- A Compute Docker container, running Bacalhau as a compute node and is configured to run Docker containers inside it. +- A Bacalhau Client Docker container to act as a jumpbox to interact with this Bacalhau deployment. +- A [Registry](https://github.com/distribution/distribution/) Docker container to act as the local container image registry. +- A Minio Docker container to support running S3 compatible input/output jobs. +- Docker Compose is used to create 5 services: the Requester Node, the Compute Node, the Client CLI Node, the registry node, and the Minio node. +- All the services are connected on the same Docker network, allowing them to communicate over the bridged network. +- All the containers have an injected custom Certificate Authority, which is used for a portion of the internal TLS communication. + - TODO: Expand the TLS setup to more components. Now it is used for the registry communication only. + +## Setup + +--- +### Build the Docker Images + +Build the Requester Node image: +```shell +docker build -f Dockerfile-RequesterNode -t bacalhau-requester-node-image . +``` + +Build the Compute Node image: +```shell +docker build -f Dockerfile-ComputeNode -t bacalhau-compute-node-image . +``` + +Build the Client Node image: +```shell +docker build -f Dockerfile-ClientNode -t bacalhau-client-node-image . +``` + +Build the Registry Node image: +```shell +docker build -f Dockerfile-DockerImageRegistryNode -t bacalhau-container-img-registry-node-image . +``` + +After running these commands, you should see the above images created: +```shell +docker image ls +``` +--- +### Running the setup + +Run Docker Compose: +```shell +docker-compose up +``` + +Access the utility client container to use the Bacalhau CLI: +```shell +docker exec -it bacalhau-client-node-container /bin/bash +``` + +Once inside the container, you can run the following commands to verify the setup: +```shell +# You should see two nodes: a Requestor and a Compute Node +bacalhau node list +``` + +Run a test workload +```shell +bacalhau docker run hello-world + +# Describe the job; it should have completed successfully. +bacalhau job describe ........ +``` + +In another terminal window, you can follow the logs of the Requester node, and compute node +```shell +docker logs bacalhau-requester-node-container -f +docker logs bacalhau-compute-node-container -f +``` + +--- +### Setting Up Minio + +Access the utility client container to use the Bacalhau CLI: +```shell +docker exec -it bacalhau-client-node-container /bin/bash +``` + +Setup an alias for the Minio CLI +```shell +# The environment variables are already injected in +# the container, no need to replace them yourself. +mc alias set bacalhau-minio "http://${BACALHAU_MINIO_NODE_HOST}:9000" "${MINIO_ROOT_USER}" "${MINIO_ROOT_PASSWORD}" +mc admin info bacalhau-minio +``` + +Create a bucket and add some files +```shell +mc mb bacalhau-minio/my-data-bucket +mc ls bacalhau-minio/my-data-bucket/section1/ +echo "This is a sample text hello hello." > example.txt +mc cp example.txt bacalhau-minio/my-data-bucket/section1/ +``` + +RUn a job with data input from the minion bucket + +```shell +# Content of aws-test-job.yaml below +bacalhau job run aws-test-job.yaml +``` + +```yaml +Name: S3 Job Data Access Test +Type: batch +Count: 1 +Tasks: + - Name: main + Engine: + Type: docker + Params: + Image: ubuntu:latest + Entrypoint: + - /bin/bash + Parameters: + - "-c" + - "cat /put-my-s3-data-here/example.txt" + InputSources: + - Target: "/put-my-s3-data-here" + Source: + Type: s3 + Params: + Bucket: "my-data-bucket" + Key: "section1/" + Endpoint: "http://bacalhau-minio-node:9000" + Region: "us-east-1" # If no region added, it fails, even for minio +``` + +--- +### Setting Up private registry + +This docker compose deployment has a private registry deployed on its own node. It allows us to +create tests and experiment with docker images jobs without the need to use DockerHub in anyway. + +From inside the client container, let's pull an image from DockerHub, push it to our own private registry, +then run a docker job running the image in out private registry. + +```shell +# pull from docker hub +docker pull ubuntu + +# tag the image to prepare it to be push to our private registry +docker image tag ubuntu bacalhau-container-img-registry-node:5000/firstbacalhauimage + +# push the image to our private registry +docker push bacalhau-container-img-registry-node:5000/firstbacalhauimage +``` + +Now, let's create a job that references that image in private registry: + +```shell +# Content of private-registry-test-job.yaml below +bacalhau job run private-registry-test-job.yaml +``` + +```yaml +Name: Job to test using local registry images +Type: batch +Count: 1 +Tasks: + - Name: main + Engine: + Type: docker + Params: + Image: bacalhau-container-img-registry-node:5000/firstbacalhauimage + Entrypoint: + - /bin/bash + Parameters: + - "-c" + - "echo test-local-registry" +``` + +--- +### Notes: + +If for some reason after running `docker-compose up`, you faced issues with the Image registry node starting, try to remove the image registry docker volume by running: + +```shell +# Destroy the deployment +docker-compose down + +# Remove registry volume +docker volume rm test-integration_registry-volume + +# Create deployment again +docker-compose up +``` diff --git a/test-integration/certificates/README.md b/test-integration/certificates/README.md new file mode 100644 index 0000000000..f993908841 --- /dev/null +++ b/test-integration/certificates/README.md @@ -0,0 +1,9 @@ +# Certificate Generation + +The script in the folder allows you to generate certificates that are signed by a root CA, and provide the +CN and SAN for these leaf certs. The generated certs will be added to the `generated_assets` directory. + +Usage: `./generate_leaf_certs.sh ` +```shell +./generate_leaf_certs.sh my-bacalhau-requester-node +``` diff --git a/test-integration/certificates/generate_leaf_certs.sh b/test-integration/certificates/generate_leaf_certs.sh new file mode 100755 index 0000000000..0411adc9d3 --- /dev/null +++ b/test-integration/certificates/generate_leaf_certs.sh @@ -0,0 +1,71 @@ +#!/bin/bash + +# Set variables +ROOT_CA_CERT="generated_assets/bacalhau_test_root_ca.crt" +ROOT_CA_KEY="generated_assets/bacalhau_test_root_ca.key" +DAYS_VALID=1825 # 5 years + +# Organization name and country (same as before) +ORG_NAME="Bacalhau" +COUNTRY="US" + +# Check if the input argument is provided +if [[ -z "$1" ]]; then + echo "Error: Please provide a string for the Common Name and Subject Alternative Names." + exit 1 +fi + +COMMON_NAME="$1" +OUTPUT_CERT="generated_assets/${COMMON_NAME}.crt" +OUTPUT_KEY="generated_assets/${COMMON_NAME}.key" +CSR_PATH="generated_assets/${COMMON_NAME}.csr" +CNF_PATH="generated_assets/${COMMON_NAME}.cnf" + +# Check if the files already exist +if [[ -f "${OUTPUT_CERT}" ]] || [[ -f "${OUTPUT_KEY}" ]]; then + echo "Error: One or both of the following files already exist:" + [[ -f "${OUTPUT_CERT}" ]] && echo " - ${OUTPUT_CERT}" + [[ -f "${OUTPUT_KEY}" ]] && echo " - ${OUTPUT_KEY}" + echo "Please remove or rename the existing files before running this script." + exit 1 +fi + +# Generate a private key for the new certificate +echo "Generating certificate signed by the root CA..." +openssl genpkey -algorithm RSA -out "${OUTPUT_KEY}" -pkeyopt rsa_keygen_bits:4096 + +# Create an OpenSSL configuration file for the SAN +cat > "${CNF_PATH}" </dev/null 2>&1; then + echo "dockerd is available! Now Starting Bacalhau as a compute node" + bacalhau config set compute.auth.token="${NETWORK_AUTH_TOKEN}" + bacalhau serve --compute -c compute.orchestrators="nats://${REQUESTER_NODE_LINK}:4222" + # Wait for any process to exit + wait -n + + # Exit with status of process that exited first + exit $? + fi + + # Wait before retrying + echo "dockerd is not available yet. Retrying in ${RETRY_INTERVAL} seconds..." + sleep "${RETRY_INTERVAL}" + + # Increment attempt counter + attempt=$((attempt + 1)) +done + +echo "dockerd did not become available within ${TOTAL_WAIT_TIME_FOR_DOCKERD} seconds." +exit 1 diff --git a/test-integration/docker-compose.yml b/test-integration/docker-compose.yml new file mode 100644 index 0000000000..2340fba1a6 --- /dev/null +++ b/test-integration/docker-compose.yml @@ -0,0 +1,117 @@ +x-common-env-variables: &common-env-variables + NETWORK_AUTH_TOKEN: "i_am_very_secret_token" + BACALHAU_API_PORT: "1234" + MINIO_ROOT_USER: "minioadmin" + MINIO_ROOT_PASSWORD: "minioadminpass" + AWS_ACCESS_KEY_ID: "minioadmin" + AWS_SECRET_ACCESS_KEY: "minioadminpass" + +networks: + bacalhau-network: + driver: bridge + +volumes: + minio-volume: + driver: local + registry-volume: + driver: local + +services: + bacalhau-minio-node: + image: quay.io/minio/minio + container_name: bacalhau-minio-node-container + command: server /data --console-address ":9001" + volumes: + - minio-volume:/data + restart: always + networks: + - bacalhau-network + environment: *common-env-variables + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ] + interval: 1s + timeout: 5s + retries: 30 + start_period: 2s + + bacalhau-container-img-registry-node: + image: bacalhau-container-img-registry-node-image + container_name: bacalhau-container-img-registry-container + volumes: + - registry-volume:/var/lib/registry + restart: always + networks: + - bacalhau-network + environment: + REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY: /var/lib/registry + REGISTRY_HTTP_ADDR: "0.0.0.0:5000" + REGISTRY_HTTP_TLS_CERTIFICATE: "/certs/bacalhau-container-img-registry-node.crt" + REGISTRY_HTTP_TLS_KEY: "/certs/bacalhau-container-img-registry-node.key" + healthcheck: + test: [ "CMD-SHELL", "nc -zv localhost 5000" ] + interval: 1s + timeout: 5s + retries: 30 + start_period: 2s + + bacalhau-requester-node: + image: bacalhau-requester-node-image + container_name: bacalhau-requester-node-container + networks: + - bacalhau-network + environment: *common-env-variables + depends_on: + bacalhau-minio-node: + condition: service_healthy + privileged: true + command: + - /bin/bash + - -c + - | + bacalhau config set "orchestrator.auth.token" "$${NETWORK_AUTH_TOKEN}" && bacalhau serve --orchestrator -c api.port=$${BACALHAU_API_PORT} + healthcheck: + test: [ "CMD-SHELL", "nc -zv localhost 1234" ] + interval: 1s + timeout: 5s + retries: 30 + start_period: 2s + + bacalhau-compute-node: + image: bacalhau-compute-node-image + container_name: bacalhau-compute-node-container + privileged: true + networks: + - bacalhau-network + depends_on: + bacalhau-requester-node: + condition: service_healthy + bacalhau-container-img-registry-node: + condition: service_healthy + environment: + <<: *common-env-variables + REQUESTER_NODE_LINK: 'bacalhau-requester-node' + healthcheck: + test: [ "CMD-SHELL", "nc -zv localhost 1234" ] + interval: 1s + timeout: 5s + retries: 30 + start_period: 2s + + bacalhau-client-node: + image: bacalhau-client-node-image + container_name: bacalhau-client-node-container + privileged: true + networks: + - bacalhau-network + depends_on: + bacalhau-requester-node: + condition: service_healthy + bacalhau-compute-node: + condition: service_healthy + bacalhau-container-img-registry-node: + condition: service_healthy + environment: + <<: *common-env-variables + BACALHAU_API_HOST: 'bacalhau-requester-node' + BACALHAU_COMPUTE_NODE_HOST: 'bacalhau-compute-node' + BACALHAU_MINIO_NODE_HOST: 'bacalhau-minio-node'