Skip to content

Commit

Permalink
feat: Go-API for extending Variant command written in DSL
Browse files Browse the repository at this point in the history
  • Loading branch information
mumoshu committed Feb 4, 2020
1 parent 469ad97 commit 4bcd96a
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 97 deletions.
42 changes: 42 additions & 0 deletions pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package variant

import (
"bytes"
"errors"
"io"
"io/ioutil"
)

func Pipe() (func() (*bytes.Buffer, error), io.WriteCloser) {
r, w := io.Pipe()

out := &bytes.Buffer{}

outDone := make(chan error, 1)

go func() {
bs, err := ioutil.ReadAll(r)

if err != nil {
outDone <- err
return
}

if _, err := out.Write(bs); err != nil {
outDone <- err
return
}

outDone <- nil
}()

return func() (*bytes.Buffer, error) {
err := <-outDone

if !errors.Is(err, io.EOF) {
return nil, err
}

return out, nil
}, w
}
136 changes: 74 additions & 62 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,100 +419,112 @@ func (app *App) Run(cmd string, args map[string]interface{}, opts map[string]int
return app.run(nil, cmd, args, opts)
}

func (app *App) run(l *EventLogger, cmd string, args map[string]interface{}, opts map[string]interface{}) (*Result, error) {
func (app *App) Job(l *EventLogger, cmd string, args map[string]interface{}, opts map[string]interface{}) (func() (*Result, error), error) {
jobByName := app.JobByName
cc := app.Config

j, ok := jobByName[cmd]
if !ok {
j, ok = jobByName[""]
if !ok {
panic(fmt.Errorf("command %q not found", cmd))
return nil, fmt.Errorf("command %q not found", cmd)
}
}

jobCtx, err := createJobContext(cc, j, args, opts)
return func() (*Result, error) {
cc := app.Config

if err != nil {
app.PrintError(err)
return nil, err
}
jobCtx, err := createJobContext(cc, j, args, opts)

if l == nil {
l = NewEventLogger(cmd, args, opts)
}

if j.Log != nil && j.Log.Collects != nil && j.Log.Forwards != nil && len(j.Log.Forwards) > 0 {
var file string
if err != nil {
app.PrintError(err)
return nil, err
}

if j.Log.File.Range().Start != j.Log.File.Range().End {
if diags := gohcl2.DecodeExpression(j.Log.File, jobCtx, &file); diags.HasErrors() {
return nil, diags
}
if l == nil {
l = NewEventLogger(cmd, args, opts)
}

logCollector := app.newLogCollector(file, j, jobCtx)
unregister := l.Register(logCollector)
if j.Log != nil && j.Log.Collects != nil && j.Log.Forwards != nil && len(j.Log.Forwards) > 0 {
var file string

defer func() {
if err := unregister(); err != nil {
panic(err)
if j.Log.File.Range().Start != j.Log.File.Range().End {
if diags := gohcl2.DecodeExpression(j.Log.File, jobCtx, &file); diags.HasErrors() {
return nil, diags
}
}
}()
}

conf, err := app.getConfigs(jobCtx, cc, j, "config", func(j JobSpec) []Config { return j.Configs }, nil)
if err != nil {
return nil, err
}

jobCtx.Variables["conf"] = conf
logCollector := app.newLogCollector(file, j, jobCtx)
unregister := l.Register(logCollector)

secretRefsEvaluator, err := vals.New(vals.Options{CacheSize: 100})
if err != nil {
return nil, fmt.Errorf("failed to initialize vals: %v", err)
}

sec, err := app.getConfigs(jobCtx, cc, j, "secret", func(j JobSpec) []Config { return j.Secrets }, func(m map[string]interface{}) (map[string]interface{}, error) {
return secretRefsEvaluator.Eval(m)
})
defer func() {
if err := unregister(); err != nil {
panic(err)
}
}()
}

if err != nil {
return nil, err
}
conf, err := app.getConfigs(jobCtx, cc, j, "config", func(j JobSpec) []Config { return j.Configs }, nil)
if err != nil {
return nil, err
}

jobCtx.Variables["sec"] = sec
jobCtx.Variables["conf"] = conf

needs := map[string]cty.Value{}
secretRefsEvaluator, err := vals.New(vals.Options{CacheSize: 100})
if err != nil {
return nil, fmt.Errorf("failed to initialize vals: %v", err)
}

var concurrency int
sec, err := app.getConfigs(jobCtx, cc, j, "secret", func(j JobSpec) []Config { return j.Secrets }, func(m map[string]interface{}) (map[string]interface{}, error) {
return secretRefsEvaluator.Eval(m)
})

if !IsExpressionEmpty(j.Concurrency) {
if err := gohcl2.DecodeExpression(j.Concurrency, jobCtx, &concurrency); err != nil {
if err != nil {
return nil, err
}

if concurrency < 1 {
return nil, fmt.Errorf("concurrency less than 1 can not be set. If you wanted %d for a concurrency equals to the number of steps, is isn't a good idea. Some system has a relatively lower fd limit that can make your command fail only when there are too many steps. Always use static number of concurrency", concurrency)
jobCtx.Variables["sec"] = sec

needs := map[string]cty.Value{}

var concurrency int

if !IsExpressionEmpty(j.Concurrency) {
if err := gohcl2.DecodeExpression(j.Concurrency, jobCtx, &concurrency); err != nil {
return nil, err
}

if concurrency < 1 {
return nil, fmt.Errorf("concurrency less than 1 can not be set. If you wanted %d for a concurrency equals to the number of steps, is isn't a good idea. Some system has a relatively lower fd limit that can make your command fail only when there are too many steps. Always use static number of concurrency", concurrency)
}
} else {
concurrency = 1
}
} else {
concurrency = 1
}

{
res, err := app.execJobSteps(l, jobCtx, needs, j.Steps, concurrency)
if res != nil || err != nil {
return res, err
{
res, err := app.execJobSteps(l, jobCtx, needs, j.Steps, concurrency)
if res != nil || err != nil {
return res, err
}
}
}

{
r, err := app.execJob(l, j, jobCtx)
if r == nil && err == nil {
return nil, fmt.Errorf(NoRunMessage)
{
r, err := app.execJob(l, j, jobCtx)
if r == nil && err == nil {
return nil, fmt.Errorf(NoRunMessage)
}
return r, err
}
return r, err
}, nil
}

func (app *App) run(l *EventLogger, cmd string, args map[string]interface{}, opts map[string]interface{}) (*Result, error) {
jr, err := app.Job(l, cmd, args, opts)
if err != nil {
return nil, err
}

return jr()
}

func (app *App) WriteDiags(diagnostics hcl2.Diagnostics) {
Expand Down
124 changes: 111 additions & 13 deletions test/variant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"testing"

variant "github.com/mumoshu/variant2"
Expand Down Expand Up @@ -55,52 +56,149 @@ func TestNewFile(t *testing.T) {
t.Fatal(err)
}

stdout := &bytes.Buffer{}
stderr := &bytes.Buffer{}

if err := myapp.Run([]string{"app", "deploy", "--namespace=default"}, variant.RunOptions{
Stdout: stdout,
Stderr: stderr,
}); err != nil {
t.Fatal(err)
}

var verr variant.Error

var code int

if err != nil {
if ok := errors.As(err, &verr); ok {
code = verr.ExitCode
} else {
code = 1
}
} else {
code = 0
}

if code != 0 {
t.Errorf("unexpected code: %d", code)
}
}

func TestExtensionWithGo(t *testing.T) {
myapp, err := variant.Load("../examples/simple/simple.variant")
if err != nil {
t.Fatal(err)
}

// outWriter and errWriter are automatically closed by Variant core after the calling anonymous func
// to avoid leaking
myapp.Add(
variant.Job{
Name: "foo bar",
Description: "foobar",
Options: map[string]variant.Option{
Options: map[string]variant.Variable{
"namespace": {
Type: "string",
Type: variant.String,
Description: "namespace",
},
},
Parameters: map[string]variant.Variable{
"param1": {
Type: variant.String,
Description: "param1",
},
},
Run: func(ctx context.Context, s variant.State) error {
ns := s.Args["namespace"].(string)
j, err := myapp.Job("bar baz", variant.JobOptions{Args: map[string]interface{}{"namespace": ns}})
v, ok := s.Options["namespace"]

if !ok {
return fmt.Errorf("missing option %q", "namespace")
}

ns := v.(string)

out, stdoutW := variant.Pipe()
errs, stderrW := variant.Pipe()

defer s.Stdout.Close()
defer s.Stderr.Close()

subst := variant.State{
Parameters: map[string]interface{}{},
Options: map[string]interface{}{"namespace": ns},
Stdout: stdoutW,
Stderr: stderrW,
}
j, err := myapp.Job("app deploy", subst)

if err != nil {
return err
}

if err := j(ctx); err != nil {
return err
}

o, err := out()
if err != nil {
return err
}

if err := j.Run(ctx); err != nil {
if _, err := s.Stdout.Write([]byte("OUTPUT: " + o.String())); err != nil {
return err
}

if _, err := s.Stdout.Write([]byte("OUTPUT")); err != nil {
e, err := errs()
if err != nil {
return err
}

if _, err := s.Stderr.Write([]byte("ERROR: ")); err != nil {
if _, err := s.Stderr.Write([]byte("ERROR: " + e.String())); err != nil {
return err
}

return nil
},
})

stdout := &bytes.Buffer{}
stderr := &bytes.Buffer{}
getStdout, stdout := variant.Pipe()
getStderr, stderr := variant.Pipe()

if err := myapp.Run([]string{"app", "deploy", "--namespace=default"}, variant.RunOptions{
Stdout: stdout,
Stderr: stderr,
}); err != nil {
jr, err := myapp.Job("foo bar", variant.State{
Stdout: stdout,
Stderr: stderr,
Parameters: map[string]interface{}{},
Options: map[string]interface{}{"namespace": "default"},
})
if err != nil {
t.Fatal(err)
}

if err := jr(context.TODO()); err != nil {
t.Fatal(err)
}

outs, err := getStdout()
if err != nil {
t.Fatal(err)
}

outStr := outs.String()
if outStr != "<nil>" {
t.Errorf("unexpected stdout: got %q", outStr)
}

errs, err := getStderr()
if err != nil {
t.Fatal(err)
}

errStr := errs.String()
if errStr != "<nil>" {
t.Errorf("unexpected stderr: got %q", errStr)
}

var verr variant.Error

var code int
Expand Down
Loading

0 comments on commit 4bcd96a

Please sign in to comment.