Skip to content

Commit

Permalink
feat(workflows): tweaks to image and video jobs
Browse files Browse the repository at this point in the history
feat: tweaks to image and video jobs
  • Loading branch information
enricorotundo authored Apr 19, 2023
1 parent 1521be7 commit 9697b33
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 8 deletions.
7 changes: 6 additions & 1 deletion cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"github.com/spf13/cobra"
)

const (
defaultNumWorkers = 10
defaultMaxQueueSize = 1024
)

type runEFunc func(cmd *cobra.Command, args []string) error

func NewRunCommand() *cobra.Command {
Expand Down Expand Up @@ -47,7 +52,7 @@ func createRunCommand(appContext cli.AppContext) runEFunc {
ctx, cancelFunc := context.WithCancel(cmd.Context())
defer cancelFunc()
// Job Queue
jobQueue, err := queue.NewGenericQueue(ctx, 10, 1024)
jobQueue, err := queue.NewGenericQueue(ctx, defaultNumWorkers, defaultMaxQueueSize)
if err != nil {
return err
}
Expand Down
57 changes: 57 additions & 0 deletions cmd/run/run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package run

import (
"context"
"os"
"testing"

"time"

"github.com/bacalhau-project/amplify/pkg/cli"
"github.com/bacalhau-project/amplify/pkg/config"
"github.com/bacalhau-project/amplify/pkg/executor"
"github.com/spf13/cobra"
"gotest.tools/assert"
)

func TestRunCommand(t *testing.T) {
tempFile := t.TempDir() + "/config.yaml"
err := os.WriteFile(tempFile, []byte(`jobs:
- id: my-foo-job
graph:
- id: my-foo-node
job_id: my-foo-job
inputs:
- root: true # Identifies that this is a root node
`), 0644)
assert.NilError(t, err)

appContext := cli.AppContext{
Config: &config.AppConfig{
ConfigPath: tempFile,
Port: 9999,
},
Executor: map[string]executor.Executor{"": &mockExecutor{}},
}

f := createRunCommand(appContext)
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
rootCmd := &cobra.Command{
Use: "root",
RunE: f,
}
rootCmd.SetContext(ctx)
err = f(rootCmd, []string{"bafybeibt4amyuwvxjgq6rynmchyplbu33nixl63sdcsm7g2nb2gt6vrixu"})
assert.NilError(t, err)
}

type mockExecutor struct{}

func (*mockExecutor) Execute(context.Context, interface{}) (executor.Result, error) {
return executor.Result{}, nil
}

func (*mockExecutor) Render(config.Job, []executor.ExecutorIOSpec, []executor.ExecutorIOSpec) (interface{}, error) {
return "", nil
}
71 changes: 64 additions & 7 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,35 @@ jobs:
- id: root-job
type: internal # This job will run on an internal executor. Internal job, doesn't leave Amplify.
internal_job_id: root-job # Link to internal job ID, must exist in the codebase
- id: image-resize-job # This job resizes images recursively
type: bacalhau
image: dpokidov/imagemagick:7.1.0-47-ubuntu
entrypoint:
- bash
- -c
- >-
for res in '10' '25' '50' '75';
do
mkdir -p /outputs/scale_${res}%
magick mogrify -format jpg -resize ${res}% -quality 100 -path /outputs/scale_${res}% $(find /inputs/image -type f | xargs)
done
- id: video-resize-job
type: bacalhau
image: linuxserver/ffmpeg:5.1.2
entrypoint:
- bash
- -c
- >-
for file in $(find /inputs/video -type f);
do
for res in '1080' '720' '480' '360' '240' '144';
do
extension="${file#*.}";
name="${file%%.*}";
output_file="/outputs/scaled_${res}_$(basename $name $extension).mp4";
ffmpeg -y -i $file -vcodec libx264 -vf "scale=240:-2,setsar=1" $output_file ;
done
done
# Amplify Work Graph specification
# Each item in the list is a node in the execution graph. A single request
Expand Down Expand Up @@ -64,14 +93,42 @@ graph:
# This is an "input", but it doesn't have a path, and so
# isn't mounted in the container. It is only used to look at the
# stdout for the predicate below.
predicate: '.*"Content-Type":"image\/jpeg".*' # A regex that must match the stdout of the node_id
predicate: '.*' # A regex that must match the stdout of the node_id
output_id: default
path: /inputs/root
- node_id: metadata-node # ID of the step to get data from
output_id: default # ID of the output to get data from
path: /inputs/metadata
#predicate: '.*"Content-Type":"image\/jpeg".*' # A regex that must match the stdout of the node_id
predicate: '.*' # A regex that must match the stdout of the node_id
outputs:
- path: /outputs
- id: tree-node
job_id: tree-job
# - id: tree-node
# job_id: tree-job
# inputs:
# - node_id: metadata-node
# path: /inputs/metadata
# - node_id: merge-node
# path: /inputs/merge
# # No outputs with this job, just prints to stdout
# outputs:
# - id: default
# path: /outputs
- id: image-resize-node
job_id: image-resize-job
inputs:
- node_id: metadata-node
path: /inputs/metadata
- node_id: merge-node
path: /inputs/merge
# No outputs with this job, just prints to stdout
predicate: '.*image\/.*'
- node_id: root-node
path: /inputs/image/
outputs:
- path: /outputs
- id: video-resize-node
job_id: video-resize-job
inputs:
- node_id: metadata-node
predicate: '.*video\/.*'
- node_id: root-node
path: /inputs/video/
outputs:
- path: /outputs

0 comments on commit 9697b33

Please sign in to comment.