Skip to content

Commit

Permalink
feat: update worker watcher to "pack" i.e. leverage as many cores as …
Browse files Browse the repository at this point in the history
…the pod/machine provides

Also adds a `--pack` build option

```go
// Run k concurrent tasks; if k=0 and machine has N cores, then k=N
Pack int `yaml:",omitempty"`
```

Fixes #25

Signed-off-by: Nick Mitchell <[email protected]>
  • Loading branch information
starpit committed Nov 18, 2024
1 parent 550efb0 commit c4c7439
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 4 deletions.
2 changes: 2 additions & 0 deletions cmd/options/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func AddBuildOptions(cmd *cobra.Command) (*build.Options, error) {

cmd.Flags().StringToStringVarP(&options.Env, "env", "e", options.Env, "Set environment variables")

cmd.Flags().IntVar(&options.Pack, "pack", options.Pack, "Run k concurrent tasks; if k=0 and machine has N cores, then k=N")

AddTargetOptionsTo(cmd, &options)
AddLogOptionsTo(cmd, &options)
return &options, nil
Expand Down
4 changes: 4 additions & 0 deletions cmd/subcommands/component/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ func Run() *cobra.Command {
Args: cobra.MatchAll(cobra.OnlyValidArgs),
}

pack := 0
cmd.Flags().IntVar(&pack, "pack", pack, "Run k concurrent tasks; if k=0 and machine has N cores, then k=N")

var step int
cmd.Flags().IntVar(&step, "step", step, "Which step are we part of")
cmd.MarkFlagRequired("step")
Expand Down Expand Up @@ -51,6 +54,7 @@ func Run() *cobra.Command {
}

return worker.Run(context.Background(), args, worker.Options{
Pack: pack,
CallingConvention: ccOpts.CallingConvention,
StartupDelay: startupDelay,
PollingInterval: pollingInterval,
Expand Down
3 changes: 3 additions & 0 deletions pkg/build/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Options struct {
ImageID string `yaml:"imageID,omitempty"`
CreateNamespace bool `yaml:"createNamespace,omitempty"`
Workers int `yaml:",omitempty"`

// Run k concurrent tasks; if k=0 and machine has N cores, then k=N
Pack int `yaml:",omitempty"`
}

//go:embed buildOptions.json
Expand Down
3 changes: 2 additions & 1 deletion pkg/fe/transformer/api/workerpool/lower.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func Lower(buildName string, ctx llir.Context, app hlir.Application, pool hlir.W
}

app.Spec.Command = fmt.Sprintf(`trap "$LUNCHPAIL_EXE component worker prestop %s" EXIT
$LUNCHPAIL_EXE component worker run --delay %d --calling-convention %v %s -- %s`,
$LUNCHPAIL_EXE component worker run --pack %d --delay %d --calling-convention %v %s -- %s`,
queueArgs,
opts.Pack,
startupDelay,
callingConvention,
queueArgs,
Expand Down
3 changes: 3 additions & 0 deletions pkg/runtime/worker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
)

type Options struct {
// Run k concurrent tasks; if k=0 and machine has N cores, then k=N
Pack int

hlir.CallingConvention
queue.RunContext
StartupDelay int
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func printenv() {

func Run(ctx context.Context, handler []string, opts Options) error {
if opts.LogOptions.Verbose {
fmt.Fprintf(os.Stderr, "Worker starting up run=%s step=%d pool=%s worker=%s\n", opts.RunContext.RunName, opts.RunContext.Step, opts.RunContext.PoolName, opts.RunContext.WorkerName)
fmt.Fprintf(os.Stderr, "Worker starting up run=%s pack=%d step=%d pool=%s worker=%s\n", opts.RunContext.RunName, opts.Pack, opts.RunContext.Step, opts.RunContext.PoolName, opts.RunContext.WorkerName)
printenv()
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/runtime/worker/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"time"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -49,7 +50,12 @@ func startWatch(ctx context.Context, handler []string, client s3.S3Client, opts
// i.e. concurrent processing of tasks in a single Lunchpail
// worker (see #25)
group, gctx := errgroup.WithContext(ctx)
group.SetLimit(1)
pack := opts.Pack
if pack == 0 {
// see cmd/main.go on how this will be responsive to cgroup limits
pack = runtime.GOMAXPROCS(0)
}
group.SetLimit(pack)

// Wait for a kill file and then cancel the watcher (that runs in the for{} loop below)
cancellable, cancel := context.WithCancel(gctx)
Expand Down
3 changes: 2 additions & 1 deletion tests/tests/python-language-pdf2parquet/settings.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ api=workqueue
expected=("Done with nrows=1 nsuccess=1 nfail=0 nskip=0" "Done with nrows=2 nsuccess=2 nfail=0 nskip=0")
NUM_DESIRED_OUTPUTS=0

up_args='"$TEST_PATH"/pail/test-data/input/redp5110-ch1.pdf "$TEST_PATH"/pail/test-data/input/archive1.zip'
# --pack=1 because FileNotFoundError: [Errno 2] No such file or directory: '/home/runner/.EasyOCR//model/temp.zip'
up_args='--pack=1 "$TEST_PATH"/pail/test-data/input/redp5110-ch1.pdf "$TEST_PATH"/pail/test-data/input/archive1.zip'

0 comments on commit c4c7439

Please sign in to comment.