From c4c74390de7190b098600110ff940930a54169e8 Mon Sep 17 00:00:00 2001 From: Nick Mitchell Date: Mon, 18 Nov 2024 12:59:41 -0500 Subject: [PATCH] feat: update worker watcher to "pack" i.e. leverage as many cores as the pod/machine provides Also adds a `--pack` build option ```go // Run k concurrent tasks; if k=0 and machine has N cores, then k=N Pack int `yaml:",omitempty"` ``` Fixes #25 Signed-off-by: Nick Mitchell --- cmd/options/build.go | 2 ++ cmd/subcommands/component/worker/run.go | 4 ++++ pkg/build/options.go | 3 +++ pkg/fe/transformer/api/workerpool/lower.go | 3 ++- pkg/runtime/worker/options.go | 3 +++ pkg/runtime/worker/run.go | 2 +- pkg/runtime/worker/watcher.go | 8 +++++++- tests/tests/python-language-pdf2parquet/settings.sh | 3 ++- 8 files changed, 24 insertions(+), 4 deletions(-) diff --git a/cmd/options/build.go b/cmd/options/build.go index 1bba95df..8cfce1bb 100644 --- a/cmd/options/build.go +++ b/cmd/options/build.go @@ -37,6 +37,8 @@ func AddBuildOptions(cmd *cobra.Command) (*build.Options, error) { cmd.Flags().StringToStringVarP(&options.Env, "env", "e", options.Env, "Set environment variables") + cmd.Flags().IntVar(&options.Pack, "pack", options.Pack, "Run k concurrent tasks; if k=0 and machine has N cores, then k=N") + AddTargetOptionsTo(cmd, &options) AddLogOptionsTo(cmd, &options) return &options, nil diff --git a/cmd/subcommands/component/worker/run.go b/cmd/subcommands/component/worker/run.go index 1614f211..ee255db3 100644 --- a/cmd/subcommands/component/worker/run.go +++ b/cmd/subcommands/component/worker/run.go @@ -19,6 +19,9 @@ func Run() *cobra.Command { Args: cobra.MatchAll(cobra.OnlyValidArgs), } + pack := 0 + cmd.Flags().IntVar(&pack, "pack", pack, "Run k concurrent tasks; if k=0 and machine has N cores, then k=N") + var step int cmd.Flags().IntVar(&step, "step", step, "Which step are we part of") cmd.MarkFlagRequired("step") @@ -51,6 +54,7 @@ func Run() *cobra.Command { } return worker.Run(context.Background(), args, worker.Options{ + Pack: pack, CallingConvention: ccOpts.CallingConvention, StartupDelay: startupDelay, PollingInterval: pollingInterval, diff --git a/pkg/build/options.go b/pkg/build/options.go index fc970aca..5de96a92 100644 --- a/pkg/build/options.go +++ b/pkg/build/options.go @@ -40,6 +40,9 @@ type Options struct { ImageID string `yaml:"imageID,omitempty"` CreateNamespace bool `yaml:"createNamespace,omitempty"` Workers int `yaml:",omitempty"` + + // Run k concurrent tasks; if k=0 and machine has N cores, then k=N + Pack int `yaml:",omitempty"` } //go:embed buildOptions.json diff --git a/pkg/fe/transformer/api/workerpool/lower.go b/pkg/fe/transformer/api/workerpool/lower.go index c564d449..4186b1be 100644 --- a/pkg/fe/transformer/api/workerpool/lower.go +++ b/pkg/fe/transformer/api/workerpool/lower.go @@ -53,8 +53,9 @@ func Lower(buildName string, ctx llir.Context, app hlir.Application, pool hlir.W } app.Spec.Command = fmt.Sprintf(`trap "$LUNCHPAIL_EXE component worker prestop %s" EXIT -$LUNCHPAIL_EXE component worker run --delay %d --calling-convention %v %s -- %s`, +$LUNCHPAIL_EXE component worker run --pack %d --delay %d --calling-convention %v %s -- %s`, queueArgs, + opts.Pack, startupDelay, callingConvention, queueArgs, diff --git a/pkg/runtime/worker/options.go b/pkg/runtime/worker/options.go index bd0afd17..cac5bcff 100644 --- a/pkg/runtime/worker/options.go +++ b/pkg/runtime/worker/options.go @@ -7,6 +7,9 @@ import ( ) type Options struct { + // Run k concurrent tasks; if k=0 and machine has N cores, then k=N + Pack int + hlir.CallingConvention queue.RunContext StartupDelay int diff --git a/pkg/runtime/worker/run.go b/pkg/runtime/worker/run.go index 2235c97e..f556adb3 100644 --- a/pkg/runtime/worker/run.go +++ b/pkg/runtime/worker/run.go @@ -17,7 +17,7 @@ func printenv() { func Run(ctx context.Context, handler []string, opts Options) error { if opts.LogOptions.Verbose { - fmt.Fprintf(os.Stderr, "Worker starting up run=%s step=%d pool=%s worker=%s\n", opts.RunContext.RunName, opts.RunContext.Step, opts.RunContext.PoolName, opts.RunContext.WorkerName) + fmt.Fprintf(os.Stderr, "Worker starting up run=%s pack=%d step=%d pool=%s worker=%s\n", opts.RunContext.RunName, opts.Pack, opts.RunContext.Step, opts.RunContext.PoolName, opts.RunContext.WorkerName) printenv() } diff --git a/pkg/runtime/worker/watcher.go b/pkg/runtime/worker/watcher.go index 9ce91d86..e562d2e9 100644 --- a/pkg/runtime/worker/watcher.go +++ b/pkg/runtime/worker/watcher.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "time" "golang.org/x/sync/errgroup" @@ -49,7 +50,12 @@ func startWatch(ctx context.Context, handler []string, client s3.S3Client, opts // i.e. concurrent processing of tasks in a single Lunchpail // worker (see #25) group, gctx := errgroup.WithContext(ctx) - group.SetLimit(1) + pack := opts.Pack + if pack == 0 { + // see cmd/main.go on how this will be responsive to cgroup limits + pack = runtime.GOMAXPROCS(0) + } + group.SetLimit(pack) // Wait for a kill file and then cancel the watcher (that runs in the for{} loop below) cancellable, cancel := context.WithCancel(gctx) diff --git a/tests/tests/python-language-pdf2parquet/settings.sh b/tests/tests/python-language-pdf2parquet/settings.sh index 303a4402..69a0db8b 100644 --- a/tests/tests/python-language-pdf2parquet/settings.sh +++ b/tests/tests/python-language-pdf2parquet/settings.sh @@ -3,4 +3,5 @@ api=workqueue expected=("Done with nrows=1 nsuccess=1 nfail=0 nskip=0" "Done with nrows=2 nsuccess=2 nfail=0 nskip=0") NUM_DESIRED_OUTPUTS=0 -up_args='"$TEST_PATH"/pail/test-data/input/redp5110-ch1.pdf "$TEST_PATH"/pail/test-data/input/archive1.zip' +# --pack=1 because FileNotFoundError: [Errno 2] No such file or directory: '/home/runner/.EasyOCR//model/temp.zip' +up_args='--pack=1 "$TEST_PATH"/pail/test-data/input/redp5110-ch1.pdf "$TEST_PATH"/pail/test-data/input/archive1.zip'