Skip to content

Commit 5a28381

Browse files
committed
pkg/workflows/sdk: add WorkflowSpec.FormatChart for mermaid flowcharts
1 parent 34e8551 commit 5a28381

File tree

8 files changed

+356
-142
lines changed

8 files changed

+356
-142
lines changed

pkg/capabilities/capabilities.go

+21
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package capabilities
22

33
import (
4+
"cmp"
45
"context"
56
"fmt"
67
"regexp"
@@ -53,6 +54,26 @@ func (c CapabilityType) IsValid() error {
5354
return fmt.Errorf("invalid capability type: %s", c)
5455
}
5556

57+
func (c CapabilityType) cmpOrder() int {
58+
switch c {
59+
case CapabilityTypeTrigger:
60+
return 0
61+
case CapabilityTypeAction:
62+
return 1
63+
case CapabilityTypeConsensus:
64+
return 2
65+
case CapabilityTypeTarget:
66+
return 3
67+
case CapabilityTypeUnknown:
68+
return 4
69+
default:
70+
return 5
71+
}
72+
}
73+
func (c CapabilityType) Compare(c2 CapabilityType) int {
74+
return cmp.Compare(c.cmpOrder(), c2.cmpOrder())
75+
}
76+
5677
// CapabilityResponse is a struct for the Execute response of a capability.
5778
type CapabilityResponse struct {
5879
Value *values.Map

pkg/workflows/models_yaml_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ var transformJSON = cmp.FilterValues(func(x, y []byte) bool {
4949
return out
5050
}))
5151

52-
func TestWorkflowSpecMarshalling(t *testing.T) {
52+
func TestWorkflowSpecYamlMarshalling(t *testing.T) {
5353
t.Parallel()
5454
fixtureReader := yamlFixtureReaderBytes(t, "marshalling")
5555

pkg/workflows/sdk/builder_test.go

+1-76
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/stretchr/testify/require"
88
"sigs.k8s.io/yaml"
99

10-
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1110
ocr3 "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/ocr3cap"
1211
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/targets/chainwriter"
1312
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/streams"
@@ -205,81 +204,7 @@ func TestBuilder_ValidSpec(t *testing.T) {
205204
actual, err := factory.Spec()
206205
require.NoError(t, err)
207206

208-
expected := sdk.WorkflowSpec{
209-
Name: "notccipethsep",
210-
Owner: "0x00000000000000000000000000000000000000aa",
211-
Triggers: []sdk.StepDefinition{
212-
{
213-
214-
Ref: "trigger",
215-
Inputs: sdk.StepInputs{},
216-
Config: map[string]any{"maxFrequencyMs": 5000},
217-
CapabilityType: capabilities.CapabilityTypeTrigger,
218-
},
219-
},
220-
Actions: make([]sdk.StepDefinition, 0),
221-
Consensus: []sdk.StepDefinition{
222-
{
223-
224-
Ref: "data-feeds-report",
225-
Inputs: sdk.StepInputs{
226-
Mapping: map[string]any{"observations": []map[string]any{
227-
{
228-
"Metadata": map[string]any{
229-
"MinRequiredSignatures": 1,
230-
"Signers": []string{"$(trigger.outputs.Metadata.Signer)"},
231-
},
232-
"Payload": []map[string]any{
233-
{
234-
"BenchmarkPrice": "$(trigger.outputs.Payload.BuyPrice)",
235-
"FeedID": anyFakeFeedID,
236-
"FullReport": "$(trigger.outputs.Payload.FullReport)",
237-
"ObservationTimestamp": "$(trigger.outputs.Payload.ObservationTimestamp)",
238-
"ReportContext": "$(trigger.outputs.Payload.ReportContext)",
239-
"Signatures": []string{"$(trigger.outputs.Payload.Signature)"},
240-
},
241-
},
242-
"Timestamp": "$(trigger.outputs.Timestamp)",
243-
},
244-
}},
245-
},
246-
Config: map[string]any{
247-
"aggregation_config": ocr3.DataFeedsConsensusConfigAggregationConfig{
248-
AllowedPartialStaleness: "0.5",
249-
Feeds: map[string]ocr3.FeedValue{
250-
anyFakeFeedID: {
251-
Deviation: "0.5",
252-
Heartbeat: 3600,
253-
},
254-
},
255-
},
256-
"aggregation_method": "data_feeds",
257-
"encoder": "EVM",
258-
"encoder_config": ocr3.EncoderConfig{
259-
"Abi": "(bytes32 FeedID, uint224 Price, uint32 Timestamp)[] Reports",
260-
},
261-
"report_id": "0001",
262-
},
263-
CapabilityType: capabilities.CapabilityTypeConsensus,
264-
},
265-
},
266-
Targets: []sdk.StepDefinition{
267-
{
268-
269-
Inputs: sdk.StepInputs{
270-
Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"},
271-
},
272-
Config: map[string]any{
273-
"address": "0xE0082363396985ae2FdcC3a9F816A586Eed88416",
274-
"deltaStage": "45s",
275-
"schedule": "oneAtATime",
276-
},
277-
CapabilityType: capabilities.CapabilityTypeTarget,
278-
},
279-
},
280-
}
281-
282-
testutils.AssertWorkflowSpec(t, expected, actual)
207+
testutils.AssertWorkflowSpec(t, NotStreamSepoliaWorkflowSpec, actual)
283208
})
284209

285210
t.Run("duplicate names causes errors", func(t *testing.T) {

pkg/workflows/sdk/compute_test.go

+1-64
Original file line numberDiff line numberDiff line change
@@ -39,71 +39,8 @@ func TestCompute(t *testing.T) {
3939

4040
spec, err2 := workflow.Spec()
4141
require.NoError(t, err2)
42-
expectedSpec := sdk.WorkflowSpec{
43-
Name: "name",
44-
Owner: "owner",
45-
Triggers: []sdk.StepDefinition{
46-
{
47-
48-
Ref: "trigger",
49-
Inputs: sdk.StepInputs{},
50-
Config: map[string]any{"maxFrequencyMs": 5000},
51-
CapabilityType: capabilities.CapabilityTypeTrigger,
52-
},
53-
},
54-
Actions: []sdk.StepDefinition{
55-
{
56-
57-
Ref: "Compute",
58-
Inputs: sdk.StepInputs{
59-
Mapping: map[string]any{"Arg0": "$(trigger.outputs)"},
60-
},
61-
Config: map[string]any{},
62-
CapabilityType: capabilities.CapabilityTypeAction,
63-
},
64-
},
65-
Consensus: []sdk.StepDefinition{
66-
{
67-
68-
Ref: "data-feeds-report",
69-
Inputs: sdk.StepInputs{
70-
Mapping: map[string]any{"observations": "$(Compute.outputs.Value)"},
71-
},
72-
Config: map[string]any{
73-
"aggregation_config": ocr3.DataFeedsConsensusConfigAggregationConfig{
74-
AllowedPartialStaleness: "false",
75-
Feeds: map[string]ocr3.FeedValue{
76-
anyFakeFeedID: {
77-
Deviation: "0.5",
78-
Heartbeat: 3600,
79-
},
80-
},
81-
},
82-
"aggregation_method": "data_feeds",
83-
"encoder": ocr3.EncoderEVM,
84-
"encoder_config": ocr3.EncoderConfig{},
85-
"report_id": "0001",
86-
},
87-
CapabilityType: capabilities.CapabilityTypeConsensus,
88-
},
89-
},
90-
Targets: []sdk.StepDefinition{
91-
{
92-
93-
Inputs: sdk.StepInputs{
94-
Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"},
95-
},
96-
Config: map[string]any{
97-
"address": "0xE0082363396985ae2FdcC3a9F816A586Eed88416",
98-
"deltaStage": "45s",
99-
"schedule": "oneAtATime",
100-
},
101-
CapabilityType: capabilities.CapabilityTypeTarget,
102-
},
103-
},
104-
}
10542

106-
testutils.AssertWorkflowSpec(t, expectedSpec, spec)
43+
testutils.AssertWorkflowSpec(t, Test2WorkflowSpec, spec)
10744
})
10845

10946
t.Run("compute runs the function and returns the value", func(t *testing.T) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
```mermaid
2+
flowchart
3+
4+
trigger[\"trigger<br>(notstreams[at]1.0.0)"/]
5+
6+
data-feeds-report[["data-feeds-report<br>(offchain_reporting[at]1.0.0)"]]
7+
trigger -- Metadata.Signer --> data-feeds-report
8+
trigger -- Payload.BuyPrice --> data-feeds-report
9+
trigger -- Payload.FullReport --> data-feeds-report
10+
trigger -- Payload.ObservationTimestamp --> data-feeds-report
11+
trigger -- Payload.ReportContext --> data-feeds-report
12+
trigger -- Payload.Signature --> data-feeds-report
13+
trigger -- Timestamp --> data-feeds-report
14+
15+
unnamed2[/"(write_ethereum-testnet-sepolia[at]1.0.0)"\]
16+
data-feeds-report --> unnamed2
17+
18+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
```mermaid
2+
flowchart
3+
4+
trigger[\"trigger<br>(notstreams[at]1.0.0)"/]
5+
6+
Compute["Compute<br>(__internal__custom_compute[at]1.0.0)"]
7+
trigger --> Compute
8+
9+
data-feeds-report[["data-feeds-report<br>(offchain_reporting[at]1.0.0)"]]
10+
Compute -- Value --> data-feeds-report
11+
12+
unnamed3[/"(write_ethereum-testnet-sepolia[at]1.0.0)"\]
13+
data-feeds-report --> unnamed3
14+
15+
```

pkg/workflows/sdk/workflow_spec.go

+109-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,68 @@
11
package sdk
22

3-
import "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
3+
import (
4+
"cmp"
5+
"slices"
6+
"strings"
7+
"text/template"
8+
9+
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
10+
)
411

512
type StepInputs struct {
613
OutputRef string
714
Mapping map[string]any
815
}
916

17+
type Output struct {
18+
Ref, Name string
19+
}
20+
21+
// Outputs returns only outputs from Mapping, indexed by Ref
22+
func (i *StepInputs) Outputs() []Output {
23+
os := outputs(i.Mapping)
24+
slices.SortFunc(os, func(a, b Output) int {
25+
return cmp.Or(
26+
cmp.Compare(a.Ref, b.Ref),
27+
cmp.Compare(a.Name, b.Name),
28+
)
29+
})
30+
return os
31+
}
32+
func outputs(m map[string]any) []Output {
33+
var os []Output
34+
addIfOutput := func(s string) {
35+
if strings.HasPrefix(s, "$(") {
36+
s = s[2 : len(s)-1] // trim $()
37+
if strings.HasSuffix(s, ".outputs") {
38+
os = append(os, Output{Ref: s[:len(s)-len(".outputs")]})
39+
}
40+
parts := strings.SplitN(s, ".outputs.", 2)
41+
if len(parts) != 2 {
42+
return
43+
}
44+
os = append(os, Output{Ref: parts[0], Name: parts[1]})
45+
}
46+
}
47+
for _, v := range m {
48+
switch t := v.(type) {
49+
case []map[string]any:
50+
for _, m := range t {
51+
os = append(os, outputs(m)...)
52+
}
53+
case map[string]any:
54+
os = append(os, outputs(t)...)
55+
case []string:
56+
for _, s := range t {
57+
addIfOutput(s)
58+
}
59+
case string:
60+
addIfOutput(t)
61+
}
62+
}
63+
return os
64+
}
65+
1066
// StepDefinition is the parsed representation of a step in a workflow.
1167
//
1268
// Within the workflow spec, they are called "Capability Properties".
@@ -35,3 +91,55 @@ func (w *WorkflowSpec) Steps() []StepDefinition {
3591
s = append(s, w.Targets...)
3692
return s
3793
}
94+
95+
func (w *WorkflowSpec) FormatChart() (string, error) {
96+
var sb strings.Builder
97+
steps := slices.Clone(w.Triggers)
98+
steps = append(steps, w.Steps()...)
99+
slices.SortFunc(steps, func(a, b StepDefinition) int {
100+
return cmp.Or(
101+
a.CapabilityType.Compare(b.CapabilityType),
102+
cmp.Compare(a.Ref, b.Ref),
103+
cmp.Compare(a.ID, b.ID),
104+
)
105+
})
106+
err := tmpl.Execute(&sb, steps)
107+
if err != nil {
108+
return "", err
109+
}
110+
return sb.String(), nil
111+
}
112+
113+
var tmpl = template.Must(template.New("").Funcs(map[string]any{
114+
"replace": strings.ReplaceAll,
115+
}).Parse(`flowchart
116+
{{ range $i, $step := . }}
117+
{{ $ref := .Ref -}}
118+
{{ $id := replace .ID "@" "[at]" -}}
119+
{{ $name := printf "%s<br>(%s)" .Ref $id -}}
120+
{{ if not .Ref -}}
121+
{{ $ref = printf "%s%d" "unnamed" $i -}}
122+
{{ $name = printf "(%s)" $id -}}
123+
{{ end -}}
124+
{{ if eq .CapabilityType "trigger" -}}
125+
{{ $ref }}[\"{{$name}}"/]
126+
{{ else if eq .CapabilityType "consensus" -}}
127+
{{ $ref }}[["{{$name}}"]]
128+
{{ else if eq .CapabilityType "target" -}}
129+
{{ $ref }}[/"{{$name}}"\]
130+
{{ else -}}
131+
{{ $ref }}["{{$name}}"]
132+
{{ end -}}
133+
{{ if .Inputs.OutputRef -}}
134+
{{ .Inputs.OutputRef }} --> {{ $step.Ref }}
135+
{{ else -}}
136+
{{ range $out := .Inputs.Outputs -}}
137+
{{ if $out.Name -}}
138+
{{ $out.Ref }} -- {{ $out.Name }} --> {{ $ref}}
139+
{{ else -}}
140+
{{ $out.Ref }} --> {{ $ref}}
141+
{{ end -}}
142+
{{ end -}}
143+
{{ end -}}
144+
{{ end -}}
145+
`))

0 commit comments

Comments
 (0)