Skip to content

Commit 2a277c9

Browse files
committed
pkg/workflows/sdk: add WorkflowSpecFactory.BeginSerial/BeginAsync
1 parent 911738d commit 2a277c9

File tree

5 files changed

+164
-5
lines changed

5 files changed

+164
-5
lines changed

pkg/workflows/sdk/builder.go

+37-1
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,22 @@ type WorkflowSpecFactory struct {
1515
emptyNames bool
1616
badCapTypes []string
1717
fns map[string]func(runtime Runtime, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error)
18+
serialMode bool
19+
prevRefs []string
1820
}
1921

2022
func (w *WorkflowSpecFactory) GetFn(name string) func(sdk Runtime, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
2123
return w.fns[name]
2224
}
2325

26+
func (w *WorkflowSpecFactory) BeginSerial() {
27+
w.serialMode = true
28+
}
29+
30+
func (w *WorkflowSpecFactory) BeginAsync() {
31+
w.serialMode = false
32+
}
33+
2434
type CapDefinition[O any] interface {
2535
capDefinition
2636
self() CapDefinition[O]
@@ -107,6 +117,15 @@ type NewWorkflowParams struct {
107117
Name string
108118
}
109119

120+
// NewSerialWorkflowSpecFactory returns a new WorkflowSpecFactory in Serial mode.
121+
// This is the same as calling NewWorkflowSpecFactory then WorkflowSpecFactory.BeginSerial.
122+
func NewSerialWorkflowSpecFactory(params NewWorkflowParams) *WorkflowSpecFactory {
123+
f := NewWorkflowSpecFactory(params)
124+
f.BeginSerial()
125+
return f
126+
}
127+
128+
// NewWorkflowSpecFactory returns a new NewWorkflowSpecFactory.
110129
func NewWorkflowSpecFactory(
111130
params NewWorkflowParams,
112131
) *WorkflowSpecFactory {
@@ -128,6 +147,16 @@ func NewWorkflowSpecFactory(
128147
// AddTo is meant to be called by generated code
129148
func (step *Step[O]) AddTo(w *WorkflowSpecFactory) CapDefinition[O] {
130149
stepDefinition := step.Definition
150+
151+
if w.serialMode {
152+
// ensure we depend on each previous step
153+
for _, prevRef := range w.prevRefs {
154+
if !stepDefinition.Inputs.HasRef(prevRef) {
155+
stepDefinition.Condition = fmt.Sprintf("$(%s.success)", prevRef)
156+
}
157+
}
158+
}
159+
131160
stepRef := stepDefinition.Ref
132161
if w.names[stepRef] && stepDefinition.CapabilityType != capabilities.CapabilityTypeTarget {
133162
w.duplicateNames[stepRef] = true
@@ -152,7 +181,14 @@ func (step *Step[O]) AddTo(w *WorkflowSpecFactory) CapDefinition[O] {
152181
w.badCapTypes = append(w.badCapTypes, stepDefinition.ID)
153182
}
154183

155-
return &capDefinitionImpl[O]{ref: fmt.Sprintf("$(%s.outputs)", step.Definition.Ref)}
184+
c := &capDefinitionImpl[O]{ref: fmt.Sprintf("$(%s.outputs)", step.Definition.Ref)}
185+
186+
if w.serialMode {
187+
w.prevRefs = []string{step.Definition.Ref}
188+
} else {
189+
w.prevRefs = append(w.prevRefs, step.Definition.Ref)
190+
}
191+
return c
156192
}
157193

158194
// AccessField is meant to be used by generated code
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
```mermaid
2+
flowchart
3+
4+
trigger[\"<b>trigger</b><br>trigger<br><i>(basic-test-trigger[at]1.0.0)</i>"/]
5+
6+
compute["<b>compute</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
7+
get-bar -- Value --> compute
8+
get-baz -- Value --> compute
9+
get-foo -- Value --> compute
10+
11+
get-bar["<b>get-bar</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
12+
get-foo -..-> get-bar
13+
trigger -- cool_output --> get-bar
14+
15+
get-baz["<b>get-baz</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
16+
get-bar -..-> get-baz
17+
trigger -- cool_output --> get-baz
18+
19+
get-foo["<b>get-foo</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
20+
trigger -- cool_output --> get-foo
21+
22+
consensus[["<b>consensus</b><br>consensus<br><i>(offchain_reporting[at]1.0.0)</i>"]]
23+
compute -- Value --> consensus
24+
25+
unnamed6[/"target<br><i>(id)</i>"\]
26+
consensus --> unnamed6
27+
28+
```

pkg/workflows/sdk/workflow_spec.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,11 @@ func (os outputs) addOutput(s string) {
128128
//
129129
// Within the workflow spec, they are called "Capability Properties".
130130
type StepDefinition struct {
131-
ID string
132-
Ref string
133-
Inputs StepInputs
134-
Config map[string]any
131+
ID string
132+
Ref string
133+
Condition string
134+
Inputs StepInputs
135+
Config map[string]any
135136

136137
CapabilityType capabilities.CapabilityType
137138
}
@@ -193,6 +194,10 @@ var tmpl = template.Must(template.New("").Funcs(map[string]any{
193194
{{ else -}}
194195
{{ $ref }}["{{$name}}"]
195196
{{ end -}}
197+
{{ $condRef := parseRef .Condition -}}
198+
{{ if $condRef -}}
199+
{{ $condRef }} -..-> {{ $step.Ref }}
200+
{{ end -}}
196201
{{ if .Inputs.OutputRef -}}
197202
{{ .Inputs.OutputRef }} --> {{ $step.Ref }}
198203
{{ else -}}

pkg/workflows/sdk/workflow_spec_test.go

+87
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,13 @@ func TestWorkflowSpecFormatChart(t *testing.T) {
6868
{"notstreamssepolia", notStreamSepoliaWorkflowSpec},
6969
{"serial", serialWorkflowSpec},
7070
{"parallel", parallelWorkflowSpec},
71+
{"parallel_serialized", parallelSerializedWorkflowSpec},
7172
{"builder_parallel", buildSimpleWorkflowSpec(
7273
sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Owner: "test", Name: "parallel"}),
7374
).MustSpec(t)},
75+
{"builder_serial", buildSimpleWorkflowSpec(
76+
sdk.NewSerialWorkflowSpecFactory(sdk.NewWorkflowParams{Owner: "test", Name: "serial"}),
77+
).MustSpec(t)},
7478
} {
7579
t.Run(tt.name, func(t *testing.T) {
7680
requireEqualChart(t, tt.name, tt.workflow)
@@ -405,3 +409,86 @@ var parallelWorkflowSpec = sdk.WorkflowSpec{
405409
},
406410
},
407411
}
412+
413+
var parallelSerializedWorkflowSpec = sdk.WorkflowSpec{
414+
Name: "parallel-serialized",
415+
Owner: "owner",
416+
Triggers: []sdk.StepDefinition{
417+
{
418+
419+
Ref: "trigger-chain-event",
420+
Inputs: sdk.StepInputs{},
421+
Config: map[string]any{"maxFrequencyMs": 5000},
422+
CapabilityType: capabilities.CapabilityTypeTrigger,
423+
},
424+
},
425+
Actions: []sdk.StepDefinition{
426+
{
427+
428+
Ref: "get-foo",
429+
Inputs: sdk.StepInputs{
430+
Mapping: map[string]any{"Arg0": "$(trigger-chain-event.outputs)"},
431+
},
432+
CapabilityType: capabilities.CapabilityTypeAction,
433+
},
434+
{
435+
436+
Ref: "compute-foo",
437+
Inputs: sdk.StepInputs{
438+
Mapping: map[string]any{"Arg0": "$(get-foo.outputs)"},
439+
},
440+
CapabilityType: capabilities.CapabilityTypeAction,
441+
},
442+
{
443+
444+
Ref: "get-bar",
445+
Condition: "$(compute-foo.success)",
446+
Inputs: sdk.StepInputs{
447+
Mapping: map[string]any{"Arg0": "$(trigger-chain-event.outputs)"},
448+
},
449+
CapabilityType: capabilities.CapabilityTypeAction,
450+
},
451+
{
452+
453+
Ref: "compute-bar",
454+
Inputs: sdk.StepInputs{
455+
Mapping: map[string]any{"Arg0": "$(get-bar.outputs)"},
456+
},
457+
CapabilityType: capabilities.CapabilityTypeAction,
458+
},
459+
{
460+
461+
Ref: "read-token-price",
462+
Condition: "$(compute-bar.success)",
463+
Inputs: sdk.StepInputs{
464+
Mapping: map[string]any{"Arg0": "$(trigger-chain-event.outputs)"},
465+
},
466+
CapabilityType: capabilities.CapabilityTypeAction,
467+
},
468+
},
469+
Consensus: []sdk.StepDefinition{
470+
{
471+
472+
Ref: "data-feeds-report",
473+
Inputs: sdk.StepInputs{
474+
Mapping: map[string]any{
475+
"observations": []string{
476+
"$(compute-foo.outputs.Value)",
477+
"$(compute-bar.outputs.Value)",
478+
},
479+
"token_price": "$(read-token-price.outputs.Value)",
480+
},
481+
},
482+
CapabilityType: capabilities.CapabilityTypeConsensus,
483+
},
484+
},
485+
Targets: []sdk.StepDefinition{
486+
{
487+
488+
Inputs: sdk.StepInputs{
489+
Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"},
490+
},
491+
CapabilityType: capabilities.CapabilityTypeTarget,
492+
},
493+
},
494+
}

pkg/workflows/testdata/fixtures/workflows/marshalling/workflow_2_spec.json

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
{
66
77
"Ref": "report_data",
8+
"Condition": "",
89
"Inputs": {
910
"OutputRef": "",
1011
"Mapping": null
@@ -18,6 +19,7 @@
1819
{
1920
"ID": "trigger_test:aaShouldBeFirst_true:chain_ethereum:[email protected]",
2021
"Ref": "",
22+
"Condition": "",
2123
"Inputs": {
2224
"OutputRef": "",
2325
"Mapping": {
@@ -34,6 +36,7 @@
3436
{
3537
3638
"Ref": "",
39+
"Condition": "",
3740
"Inputs": {
3841
"OutputRef": "",
3942
"Mapping": {

0 commit comments

Comments
 (0)