Skip to content

Commit

Permalink
Support extra flags for exec command (#3162)
Browse files Browse the repository at this point in the history
Previously was not supporting RuntimeSettings or SpecSettings and so was
not feature complete with `docker run`. Not supports these flags
although they may need to change in future v2 cli changes.
  • Loading branch information
rossjones authored Jan 5, 2024
1 parent 20c3560 commit aa2d7c2
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 46 deletions.
3 changes: 2 additions & 1 deletion cmd/cli/config/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"golang.org/x/exp/slices"

"github.com/bacalhau-project/bacalhau/cmd/util"
"github.com/bacalhau-project/bacalhau/cmd/util/parse"
"github.com/bacalhau-project/bacalhau/pkg/config"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/logger"
Expand Down Expand Up @@ -91,7 +92,7 @@ func setConfig(key string, values ...string) error {
case []string:
viperWriter.Set(key, values)
case map[string]string:
sts, err := parseStringSliceToMap(values)
sts, err := parse.StringSliceToMap(values)
if err != nil {
return err
}
Expand Down
15 changes: 0 additions & 15 deletions cmd/cli/config/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/spf13/viper"

"github.com/bacalhau-project/bacalhau/cmd/util/flags"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
)

Expand All @@ -18,20 +17,6 @@ func NewViperWithDefaultConfig(cfg types.BacalhauConfig) *viper.Viper {
return viperSchema
}

// parseStringSliceToMap parses a slice of strings into a map.
// Each element in the slice should be a key-value pair in the form "key=value".
func parseStringSliceToMap(slice []string) (map[string]string, error) {
result := make(map[string]string)
for _, item := range slice {
key, value, err := flags.SeparatorParser("=")(item)
if err != nil {
return nil, fmt.Errorf("expected 'key=value', received invalid format for key-value pair: %s", item)
}
result[key] = value
}
return result, nil
}

func singleValueOrError(v ...string) (string, error) {
if len(v) != 1 {
return "", fmt.Errorf("expected single value got %d from %q", len(v), v)
Expand Down
127 changes: 117 additions & 10 deletions cmd/cli/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/bacalhau-project/bacalhau/cmd/util"
"github.com/bacalhau-project/bacalhau/cmd/util/flags/cliflags"
"github.com/bacalhau-project/bacalhau/cmd/util/parse"
"github.com/bacalhau-project/bacalhau/cmd/util/printer"
"github.com/bacalhau-project/bacalhau/pkg/lib/template"
"github.com/bacalhau-project/bacalhau/pkg/models"
Expand Down Expand Up @@ -113,6 +114,7 @@ func exec(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, options *E
return nil
}

//nolint:funlen
func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, options *ExecOptions) (*models.Job, error) {
var err error
var jobType, templateString string
Expand Down Expand Up @@ -180,13 +182,8 @@ func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, opti
job.Tasks[0].Engine.Params["Arguments"] = cmdArgs[1:]

// Attach any inputs the user specified to the job spec
for _, ss := range options.SpecSettings.Inputs.Values() {
src, err := legacy.FromLegacyStorageSpecToInputSource(ss)
if err != nil {
return nil, fmt.Errorf("failed to process input %s: %w", ss.Name, err)
}

job.Tasks[0].InputSources = append(job.Tasks[0].InputSources, src)
if err := prepareInputs(options, job); err != nil {
return nil, err
}

// Process --code if anything was specified. In future we may want to try and determine this
Expand All @@ -197,9 +194,116 @@ func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, opti
}
}

// Add the default publisher (which is currently IPFS)
publisherSpec := options.SpecSettings.Publisher.Value()
job.Tasks[0].Publisher = &models.SpecConfig{
Type: publisherSpec.Type.String(),
Params: publisherSpec.Params,
}

// Handle ResultPaths by using the legacy parser and converting.
if err := prepareJobOutputs(cmd.Context(), options, job); err != nil {
return nil, err
}

// Parse labels from flag, we expect key=value for the non-legacy models.Job
if err := prepareLabels(options, job); err != nil {
return nil, err
}

// Constraints for node selection
if err := prepareConstraints(options, job); err != nil {
return nil, err
}

// Environment variables
if err := prepareEnvVars(options, job); err != nil {
return nil, err
}

// Set the execution timeouts
job.Tasks[0].Timeouts = &models.TimeoutConfig{
ExecutionTimeout: options.SpecSettings.Timeout,
}

// Unsupported in new job specifications (models.Job)
// options.SpecSettings.DoNotTrack

return job, nil
}

func prepareConstraints(options *ExecOptions, job *models.Job) error {
if nodeSelectorRequirements, err := parse.NodeSelector(options.SpecSettings.Selector); err != nil {
return err
} else {
constraints, err := legacy.FromLegacyLabelSelector(nodeSelectorRequirements)
if err != nil {
return err
}
job.Constraints = constraints
}

return nil
}

func prepareInputs(options *ExecOptions, job *models.Job) error {
for _, ss := range options.SpecSettings.Inputs.Values() {
src, err := legacy.FromLegacyStorageSpecToInputSource(ss)
if err != nil {
return fmt.Errorf("failed to process input %s: %w", ss.Name, err)
}

job.Tasks[0].InputSources = append(job.Tasks[0].InputSources, src)
}
return nil
}

func prepareLabels(options *ExecOptions, job *models.Job) error {
if len(options.SpecSettings.Labels) > 0 {
if labels, err := parse.StringSliceToMap(options.SpecSettings.Labels); err != nil {
return err
} else {
job.Labels = labels
}
}
return nil
}

func prepareEnvVars(options *ExecOptions, job *models.Job) error {
if len(options.SpecSettings.EnvVar) > 0 {
if env, err := parse.StringSliceToMap(options.SpecSettings.EnvVar); err != nil {
return err
} else {
job.Tasks[0].Env = env
}
}
return nil
}

func prepareJobOutputs(ctx context.Context, options *ExecOptions, job *models.Job) error {
legacyOutputs, err := parse.JobOutputs(ctx, options.SpecSettings.OutputVolumes)
if err != nil {
return err
}

job.Tasks[0].ResultPaths = make([]*models.ResultPath, 0, len(legacyOutputs))
for _, output := range legacyOutputs {
rp := &models.ResultPath{
Name: output.Name,
Path: output.Path,
}

e := rp.Validate()
if e != nil {
return e
}

job.Tasks[0].ResultPaths = append(job.Tasks[0].ResultPaths, rp)
}

return nil
}

// addInlineContent will use codeLocation to determine if it is a single file or a
// directory and will attach to the job as an inline attachment.
func addInlineContent(ctx context.Context, codeLocation string, job *models.Job) error {
Expand All @@ -210,9 +314,12 @@ func addInlineContent(ctx context.Context, codeLocation string, job *models.Job)

target := "/code"

finfo, _ := os.Stat(absPath)
if !finfo.IsDir() {
target = fmt.Sprintf("/code/%s", finfo.Name())
if finfo, err := os.Stat(absPath); err != nil {
return fmt.Errorf("file '%s' not found", codeLocation)
} else {
if !finfo.IsDir() {
target = fmt.Sprintf("/code/%s", finfo.Name())
}
}

specConfig, err := inline.NewStorage().Upload(ctx, absPath)
Expand Down
20 changes: 1 addition & 19 deletions cmd/cli/wasm/wasm_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/bacalhau-project/bacalhau/pkg/models/migration/legacy"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -191,7 +190,7 @@ func CreateJob(ctx context.Context, cmdArgs []string, opts *WasmRunOptions) (*mo
return nil, err
}

wasmEnvvar, err := parseArrayAsMap(opts.SpecSettings.EnvVar)
wasmEnvvar, err := parse.StringSliceToMap(opts.SpecSettings.EnvVar)
if err != nil {
return nil, fmt.Errorf("wasm env vars invalid: %w", err)
}
Expand Down Expand Up @@ -229,23 +228,6 @@ func CreateJob(ctx context.Context, cmdArgs []string, opts *WasmRunOptions) (*mo
}, nil
}

// parseArrayAsMap accepts a string array where each entry is A=B and
// returns a map with {A: B}
func parseArrayAsMap(inputArray []string) (map[string]string, error) {
resultMap := make(map[string]string)

for _, v := range inputArray {
parts := strings.Split(v, "=")
if len(parts) != 2 {
return nil, fmt.Errorf("malformed entry, expected = in: %s", v)
}

resultMap[parts[0]] = parts[1]
}

return resultMap, nil
}

func parseWasmEntryModule(ctx context.Context, in string) (*model.StorageSpec, error) {
// Try interpreting this as a CID.
wasmCid, err := cid.Parse(in)
Expand Down
13 changes: 13 additions & 0 deletions cmd/util/parse/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/rs/zerolog/log"
"k8s.io/apimachinery/pkg/labels"

"github.com/bacalhau-project/bacalhau/cmd/util/flags"
"github.com/bacalhau-project/bacalhau/pkg/job"
"github.com/bacalhau-project/bacalhau/pkg/model"
)
Expand Down Expand Up @@ -92,3 +93,15 @@ func JobOutputs(ctx context.Context, outputVolumes []string) ([]model.StorageSpe

return returnOutputVolumes, nil
}

func StringSliceToMap(slice []string) (map[string]string, error) {
result := make(map[string]string)
for _, item := range slice {
key, value, err := flags.SeparatorParser("=")(item)
if err != nil {
return nil, fmt.Errorf("expected 'key=value', received invalid format for key-value pair: %s", item)
}
result[key] = value
}
return result, nil
}
3 changes: 2 additions & 1 deletion pkg/models/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (p *ResultPath) Copy() *ResultPath {
return nil
}
return &ResultPath{
Name: p.Name,
Path: p.Path,
}
}
Expand All @@ -44,7 +45,7 @@ func (p *ResultPath) Validate() error {
mErr.Errors = append(mErr.Errors, errors.New("path is blank"))
}
if validate.IsBlank(p.Name) {
mErr.Errors = append(mErr.Errors, errors.New("name is blank"))
mErr.Errors = append(mErr.Errors, errors.New("resultpath name is blank"))
}
return mErr.ErrorOrNil()
}

0 comments on commit aa2d7c2

Please sign in to comment.