diff --git a/cli/app.go b/cli/app.go index e5c6390bc4e..03e0ed909b4 100644 --- a/cli/app.go +++ b/cli/app.go @@ -79,6 +79,7 @@ const ( generalFlagNoProgress = "no-progress" generalFlagAPI = "api" generalFlagArgs = "args" + generalFlagAttributes = "attributes" generalFlagDryRun = "dry-run" moduleFlagLanguage = "language" @@ -175,34 +176,40 @@ const ( xacroFlagROSDistro = "ros-distro" ) -var commonPartFlags = []cli.Flag{ - &AliasStringFlag{ - cli.StringFlag{ - Name: generalFlagPart, - Aliases: []string{generalFlagPartID, generalFlagPartName}, - Required: true, +// partFlags builds the standard part/org/location/machine flag set. +// partRequired controls whether --part is mandatory. +func partFlags(partRequired bool) []cli.Flag { + return []cli.Flag{ + &AliasStringFlag{ + cli.StringFlag{ + Name: generalFlagPart, + Aliases: []string{generalFlagPartID, generalFlagPartName}, + Required: partRequired, + }, }, - }, - &AliasStringFlag{ - cli.StringFlag{ - Name: generalFlagOrganization, - Aliases: []string{generalFlagAliasOrg, generalFlagOrgID, generalFlagAliasOrgName}, + &AliasStringFlag{ + cli.StringFlag{ + Name: generalFlagOrganization, + Aliases: []string{generalFlagAliasOrg, generalFlagOrgID, generalFlagAliasOrgName}, + }, }, - }, - &AliasStringFlag{ - cli.StringFlag{ - Name: generalFlagLocation, - Aliases: []string{generalFlagLocationID, generalFlagAliasLocationName}, + &AliasStringFlag{ + cli.StringFlag{ + Name: generalFlagLocation, + Aliases: []string{generalFlagLocationID, generalFlagAliasLocationName}, + }, }, - }, - &AliasStringFlag{ - cli.StringFlag{ - Name: generalFlagMachine, - Aliases: []string{generalFlagAliasRobot, generalFlagMachineID, generalFlagMachineName}, + &AliasStringFlag{ + cli.StringFlag{ + Name: generalFlagMachine, + Aliases: []string{generalFlagAliasRobot, generalFlagMachineID, generalFlagMachineName}, + }, }, - }, + } } +var commonPartFlags = partFlags(true) + var commonOtlpFlags = []cli.Flag{ &cli.StringFlag{ Name: "endpoint", @@ -2784,6 +2791,84 @@ Note: There is no progress meter while copying is in progress. }, Action: createCommandWithT[machinesPartRunArgs](MachinesPartRunAction), }, + { + Name: "add-job", + Usage: "add a scheduled job to a machine part", + Description: `Add a scheduled job that runs a method on a resource at a given interval. + +With --attributes, pass a single JSON object (inline or path to a JSON file) with: + name (required) unique name for this job + schedule (required) must be one of: + "continuous" run in a loop without stopping + a Go duration e.g. "5s", "1h30m", "500ms" + a cron expr e.g. "0 0 * * *" (5-field) or "*/5 * * * * *" (6-field with seconds) + resource (required) name of the component or service to run the method on + method (required) gRPC method name, e.g. "DoCommand", "GetReadings" + command (optional) JSON object passed as the argument to DoCommand + log_configuration (optional) e.g. {"level":"debug"}. Level must be one of: debug, info, warn, error + +Example (interactive prompt): + viam machines part add-job + +Example with inline JSON: + viam machines part add-job --part= \ + --attributes '{"name":"my-job","schedule":"1h","resource":"my-sensor","method":"GetReadings"}' + +Example with a JSON file: + viam machines part add-job --part= --attributes ./job.json`, + UsageText: createUsageText("machines part add-job", []string{generalFlagPart, generalFlagAttributes}, true, false), + Flags: append(partFlags(false), &cli.StringFlag{ + Name: generalFlagAttributes, + Required: false, + Usage: "JSON job config or path to JSON file; omit to use the interactive form", + }), + Action: createCommandWithT(machinesPartAddJobAction), + }, + { + Name: "update-job", + Usage: "update a scheduled job on a machine part", + Description: `Update an existing job's configuration by name. The --attributes flag accepts a single JSON +object (inline or a path to a JSON file) with the fields to change. Only the fields provided will +be updated; all other fields remain unchanged. The job name cannot be changed. + +Example changing the schedule: + viam machines part update-job --part= --name=my-job --attributes '{"schedule":"30m"}' + +Example changing multiple fields: + viam machines part update-job --part= --name=my-job \ + --attributes '{"schedule":"0 0 * * *","method":"DoCommand","command":{"action":"reset"}}'`, + UsageText: createUsageText( + "machines part update-job", + []string{generalFlagPart, generalFlagName, generalFlagAttributes}, true, false), + Flags: append(commonPartFlags, []cli.Flag{ + &cli.StringFlag{ + Name: generalFlagName, + Required: true, + Usage: "name of the job to update", + }, + &cli.StringFlag{ + Name: generalFlagAttributes, + Required: true, + Usage: "JSON job config or path to JSON file with fields to update", + }, + }...), + Action: createCommandWithT(machinesPartUpdateJobAction), + }, + { + Name: "delete-job", + Usage: "delete a scheduled job from a machine part", + Description: `Delete an existing job by name. + +Example: + viam machines part delete-job --part= --name=my-job`, + UsageText: createUsageText("machines part delete-job", []string{generalFlagPart, generalFlagName}, true, false), + Flags: append(commonPartFlags, &cli.StringFlag{ + Name: generalFlagName, + Required: true, + Usage: "name of the job to delete", + }), + Action: createCommandWithT(machinesPartDeleteJobAction), + }, { Name: "shell", Usage: "start a shell on a machine part", diff --git a/cli/client.go b/cli/client.go index 74cd7661065..96b2741d0e6 100644 --- a/cli/client.go +++ b/cli/client.go @@ -32,6 +32,7 @@ import ( "github.com/ktr0731/go-fuzzyfinder" "github.com/nathan-fiscaletti/consolesize-go" "github.com/pkg/errors" + cron "github.com/robfig/cron/v3" "github.com/urfave/cli/v2" "go.uber.org/multierr" "go.uber.org/zap" @@ -1522,6 +1523,509 @@ func robotsPartRemoveResourceAction(c *cli.Context, args robotsPartRemoveResourc return nil } +// parseJSONOrFile tries to read input as a file, falls back to parsing as inline JSON +func parseJSONOrFile(input string) (map[string]any, error) { + var data []byte + //nolint:gosec + if fileData, err := os.ReadFile(input); err == nil { + data = fileData + } else { + data = []byte(input) + } + var result map[string]any + if err := json.Unmarshal(data, &result); err != nil { + return nil, err + } + return result, nil +} + +// validateJobConfig validates the fields of a job config map. When isUpdate is true, +// this is an update-job so not all fields are required. +func validateJobConfig(jobConfig, partConfig map[string]any, isUpdate bool) error { + // validate schedule + if schedule, ok := jobConfig["schedule"].(string); ok { + if err := validateJobSchedule(schedule); err != nil { + return err + } + } else if !isUpdate { + return errors.New("job config must include 'schedule' field (string)") + } + + // validate resource + if resource, ok := jobConfig["resource"].(string); ok { + if resource == "" { + return errors.New("'resource' field must be a non-empty string") + } + if !resourceExistsInConfig(partConfig, resource) { + return fmt.Errorf("resource %q not found in part config", resource) + } + } else if !isUpdate { + return errors.New("job config must include 'resource' field (string)") + } + + // validate method + if method, ok := jobConfig["method"].(string); ok { + if method == "" { + return errors.New("'method' field must be a non-empty string") + } + } else if !isUpdate { + return errors.New("job config must include 'method' field (string)") + } + + // Validate command is a JSON object (map) if provided. + if command, ok := jobConfig["command"]; ok { + if _, ok := command.(map[string]any); !ok { + return errors.New("'command' field must be a JSON object") + } + } + + // validate log configuration + if logConfig, ok := jobConfig["log_configuration"].(map[string]any); ok { + if level, ok := logConfig["level"].(string); ok { + validLevels := map[string]bool{ + "debug": true, "info": true, "warn": true, "warning": true, "error": true, + } + if !validLevels[strings.ToLower(level)] { + return fmt.Errorf("log_configuration level must be one of: debug, info, warn, warning, error; got %q", level) + } + } + } + + return nil +} + +// resourceExistsInConfig checks if a resource name exists in the part's components or services. +func resourceExistsInConfig(config map[string]any, name string) bool { + for _, key := range []string{"components", "services"} { + if arr, ok := config[key].([]any); ok { + for _, item := range arr { + if m, ok := item.(map[string]any); ok { + if m["name"] == name { + return true + } + } + } + } + } + return false +} + +func validateJobSchedule(schedule string) error { + if strings.ToLower(schedule) == "continuous" { + return nil + } + + intErr := validateInterval(schedule) + if intErr == nil { + return nil + } + + cronErr := validateCronExpression(schedule) + if cronErr == nil { + return nil + } + + return errors.Errorf( + "invalid schedule %q: not a valid interval (%v) or cron expression (%v)", + schedule, intErr, cronErr, + ) +} + +func validateInterval(interval string) error { + d, err := time.ParseDuration(interval) + if err != nil { + return err + } + if d <= 0 { + return errors.New("interval must be a positive duration") + } + return nil +} + +func validateCronExpression(schedule string) error { + withSeconds := len(strings.Fields(schedule)) >= 6 + if withSeconds { + p := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + if _, err := p.Parse(schedule); err != nil { + return err + } + } else { + if _, err := cron.ParseStandard(schedule); err != nil { + return err + } + } + return nil +} + +type machinesPartAddJobArgs struct { + Part string + Machine string + Location string + Organization string + Attributes string +} + +func machinesPartAddJobAction(c *cli.Context, args machinesPartAddJobArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + var jobConfig map[string]any + var part *apppb.RobotPart + + // If no attributes are provided, run the interactive huh flow. + if args.Attributes == "" { + // first, get part id through flag or prompt + if args.Part == "" { + var partID string + partForm := huh.NewForm( + huh.NewGroup( + huh.NewInput(). + Title("Part ID:"). + Description("Run 'viam machines list --all --organization=' to see all machines with their part-ids"). + Validate(func(s string) error { + if strings.TrimSpace(s) == "" { + return errors.New("part ID cannot be empty") + } + return nil + }). + Value(&partID), + ), + ) + if err := partForm.Run(); err != nil { + return err + } + partID = strings.TrimSpace(partID) + if partID == "" { + return errors.New("part ID cannot be empty") + } + + // Look up the part by ID and store it so we can use its config below. + resp, err := client.getRobotPart(partID) + if err != nil { + return errors.Wrapf(err, "part ID %q not found", partID) + } + part = resp.Part + args.Part = partID + } else { + var err error + part, err = client.robotPart(args.Organization, args.Location, args.Machine, args.Part) + if err != nil { + return err + } + } + + // 2. Build interactive form from the part config. + confMap := part.RobotConfig.AsMap() + var resourceOpts []huh.Option[string] + for _, key := range []string{"components", "services"} { + resources, err := resourcesFromPartConfig(confMap, key) + if err != nil { + return err + } + for _, r := range resources { + if n, ok := r["name"].(string); ok && n != "" { + resourceOpts = append(resourceOpts, huh.NewOption(n, n)) + } + } + } + if len(resourceOpts) == 0 { + return errors.New("This machine contains no components or services") + } + + // 3. Create the form and run it + var name, resource, method, commandStr, logLevel, scheduleType string + form := huh.NewForm(huh.NewGroup( + huh.NewNote().Title("Add a job to a part"), + huh.NewInput().Title("Set a job name:").Value(&name). + Validate(func(s string) error { + if strings.TrimSpace(s) == "" { + return errors.New("job name cannot be empty") + } + return nil + }), + huh.NewSelect[string]().Title("Select a resource:").Options(resourceOpts...).Value(&resource), + huh.NewInput().Title("Set a method:").Value(&method). + Validate(func(s string) error { + if strings.TrimSpace(s) == "" { + return errors.New("method cannot be empty") + } + return nil + }), + huh.NewInput(). + Title("If using DoCommand, set a command in JSON format (leave empty otherwise):"). + Placeholder("{}"). + Value(&commandStr). + Validate(func(s string) error { + if strings.TrimSpace(s) == "" { + return nil + } + var cmd map[string]any + if err := json.Unmarshal([]byte(s), &cmd); err != nil { + return errors.Wrap(err, "invalid JSON object") + } + return nil + }), + huh.NewSelect[string](). + Title("Set the log threshold:"). + Options( + huh.NewOption("Debug", "debug"), + huh.NewOption("Info", "info"), + huh.NewOption("Warn", "warn"), + huh.NewOption("Error", "error"), + ). + Value(&logLevel), + huh.NewSelect[string](). + Title("Set the schedule type:"). + Options( + huh.NewOption("Interval", "interval"), + huh.NewOption("Cron", "cron"), + huh.NewOption("Continuous", "continuous"), + ). + Value(&scheduleType), + )) + if err := form.Run(); err != nil { + return err + } + + // 4. last page form loads based on what type of schedule is selected + var schedule string + switch scheduleType { + case "interval": + var intervalStr string + form2 := huh.NewForm( + huh.NewGroup( + huh.NewInput(). + Title("Set the interval:"). + Description("Valid intervals look like 10s, 1m, 1h1m, etc. (Go duration format)."). + Validate(validateInterval). + Value(&intervalStr), + ), + ) + if err := form2.Run(); err != nil { + return err + } + schedule = intervalStr + case "cron": + var cronExpr string + form2 := huh.NewForm( + huh.NewGroup( + huh.NewInput(). + Title("Cron expression:"). + Description("Valid cron expressions look like 0 0 * * * for daily, */5 * * * * * for every 5 seconds, etc..."). + Validate(validateCronExpression). + Value(&cronExpr), + ), + ) + if err := form2.Run(); err != nil { + return err + } + schedule = cronExpr + default: + schedule = "continuous" + } + + // 5. Build the jobConfig map from the interactive inputs. + jobConfig = map[string]any{ + "name": name, "schedule": schedule, "resource": resource, "method": method, + } + + if method == "DoCommand" { + if strings.TrimSpace(commandStr) == "" { + jobConfig["command"] = map[string]any{} + } else { + var cmd map[string]any + if err := json.Unmarshal([]byte(commandStr), &cmd); err != nil { + return errors.Wrapf(err, "invalid command JSON") + } + jobConfig["command"] = cmd + } + } + if logLevel != "" { + jobConfig["log_configuration"] = map[string]any{"level": logLevel} + } + } else { + // Non-interactive path: attributes and part are required flags. + jobConfig, err = parseJSONOrFile(args.Attributes) + if err != nil { + return errors.Wrap(err, "failed to parse job config") + } + + partStr := strings.TrimSpace(args.Part) + if partStr == "" { + return errors.New("part is required when using --attributes; specify --part (or --part-id/--part-name)") + } + part, err = client.robotPart(args.Organization, args.Location, args.Machine, partStr) + if err != nil { + return err + } + } + + // Validate required fields and format + name, ok := jobConfig["name"].(string) + if !ok || name == "" { + return errors.New("job config must include 'name' field (string)") + } + + config := part.RobotConfig.AsMap() + if err := validateJobConfig(jobConfig, config, false); err != nil { + return err + } + + // Get existing jobs array or create new one + var jobs []any + if existingJobs, ok := config["jobs"]; ok { + if arr, ok := existingJobs.([]any); ok { + jobs = arr + } + } + + // Check if job with same name exists + for _, j := range jobs { + if jobMap, ok := j.(map[string]any); ok { + if jobMap["name"] == name { + return fmt.Errorf("job with name %s already exists on part %s", name, part.Name) + } + } + } + + jobs = append(jobs, jobConfig) + config["jobs"] = jobs + + if err := client.updateRobotPart(part, config); err != nil { + return err + } + + printf(c.App.Writer, "successfully added job %s to part %s", name, part.Name) + return nil +} + +type machinesPartUpdateJobArgs struct { + Part string + Machine string + Location string + Organization string + Name string + Attributes string +} + +func machinesPartUpdateJobAction(c *cli.Context, args machinesPartUpdateJobArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + part, err := client.robotPart(args.Organization, args.Location, args.Machine, args.Part) + if err != nil { + return err + } + + newJobConfig, err := parseJSONOrFile(args.Attributes) + if err != nil { + return errors.Wrap(err, "failed to parse job config") + } + + config := part.RobotConfig.AsMap() + if err := validateJobConfig(newJobConfig, config, true); err != nil { + return err + } + + var jobs []any + if existingJobs, ok := config["jobs"]; ok { + if arr, ok := existingJobs.([]any); ok { + jobs = arr + } + } + + // Find and update the job + found := false + for i, j := range jobs { + if jobMap, ok := j.(map[string]any); ok { + if jobMap["name"] == args.Name { + found = true + // Merge the new config into existing job, keeping the name + for k, v := range newJobConfig { + jobMap[k] = v + } + jobMap["name"] = args.Name // Ensure name doesn't change + jobs[i] = jobMap + break + } + } + } + + if !found { + return fmt.Errorf("job %s not found on part %s", args.Name, part.Name) + } + + config["jobs"] = jobs + + if err := client.updateRobotPart(part, config); err != nil { + return err + } + + printf(c.App.Writer, "successfully updated job %s on part %s", args.Name, part.Name) + return nil +} + +type machinesPartDeleteJobArgs struct { + Part string + Machine string + Location string + Organization string + Name string +} + +func machinesPartDeleteJobAction(c *cli.Context, args machinesPartDeleteJobArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + part, err := client.robotPart(args.Organization, args.Location, args.Machine, args.Part) + if err != nil { + return err + } + + config := part.RobotConfig.AsMap() + + var jobs []any + if existingJobs, ok := config["jobs"]; ok { + if arr, ok := existingJobs.([]any); ok { + jobs = arr + } + } + + // Filter out the job + var newJobs []any + found := false + for _, j := range jobs { + if jobMap, ok := j.(map[string]any); ok { + if jobMap["name"] != args.Name { + newJobs = append(newJobs, j) + } else { + found = true + } + } else { + newJobs = append(newJobs, j) + } + } + + if !found { + return fmt.Errorf("job %s not found on part %s", args.Name, part.Name) + } + + config["jobs"] = newJobs + + if err := client.updateRobotPart(part, config); err != nil { + return err + } + + printf(c.App.Writer, "successfully deleted job %s from part %s", args.Name, part.Name) + return nil +} + type robotsPartStatusArgs struct { Organization string Location string diff --git a/go.mod b/go.mod index 4f9f04ba9d0..6df4be3e97b 100644 --- a/go.mod +++ b/go.mod @@ -73,6 +73,7 @@ require ( github.com/prometheus/procfs v0.15.1 github.com/pterm/pterm v0.12.82 github.com/rhysd/actionlint v1.7.8 + github.com/robfig/cron/v3 v3.0.1 github.com/rs/cors v1.11.1 github.com/samber/lo v1.51.0 github.com/sergi/go-diff v1.4.0 @@ -291,7 +292,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/rivo/uniseg v0.4.7 // indirect - github.com/robfig/cron/v3 v3.0.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect