From 9087f5e8daf9a3e693a726839f789e6590e7ce09 Mon Sep 17 00:00:00 2001 From: Lei Date: Wed, 11 Dec 2024 11:06:13 -0800 Subject: [PATCH 1/5] add cron trigger and readcontract action (#971) Signed-off-by: Lei --- .../readcontract/action_builders_generated.go | 90 ++++++++++++++ .../readcontract_action-schema.json | 58 +++++++++ .../readcontract_action_generated.go | 115 ++++++++++++++++++ .../readcontracttest/action_mock_generated.go | 27 ++++ .../triggers/cron/cron_trigger-schema.json | 43 +++++++ .../triggers/cron/cron_trigger_generated.go | 92 ++++++++++++++ .../cron/crontest/trigger_mock_generated.go | 17 +++ .../cron/trigger_builders_generated.go | 84 +++++++++++++ 8 files changed, 526 insertions(+) create mode 100644 pkg/capabilities/actions/readcontract/action_builders_generated.go create mode 100644 pkg/capabilities/actions/readcontract/readcontract_action-schema.json create mode 100644 pkg/capabilities/actions/readcontract/readcontract_action_generated.go create mode 100644 pkg/capabilities/actions/readcontract/readcontracttest/action_mock_generated.go create mode 100644 pkg/capabilities/triggers/cron/cron_trigger-schema.json create mode 100644 pkg/capabilities/triggers/cron/cron_trigger_generated.go create mode 100644 pkg/capabilities/triggers/cron/crontest/trigger_mock_generated.go create mode 100644 pkg/capabilities/triggers/cron/trigger_builders_generated.go diff --git a/pkg/capabilities/actions/readcontract/action_builders_generated.go b/pkg/capabilities/actions/readcontract/action_builders_generated.go new file mode 100644 index 0000000000..dad3f4d499 --- /dev/null +++ b/pkg/capabilities/actions/readcontract/action_builders_generated.go @@ -0,0 +1,90 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package readcontract + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" +) + +func (cfg Config) New(w *sdk.WorkflowSpecFactory, ref string, input ActionInput) OutputCap { + + def := sdk.StepDefinition{ + ID: "read-contract-action@1.0.0", Ref: ref, + Inputs: input.ToSteps(), + Config: map[string]any{ + "ContractAddress": cfg.ContractAddress, + "ContractName": cfg.ContractName, + "ContractReaderConfig": cfg.ContractReaderConfig, + "ReadIdentifier": cfg.ReadIdentifier, + }, + CapabilityType: capabilities.CapabilityTypeAction, + } + + step := sdk.Step[Output]{Definition: def} + raw := step.AddTo(w) + return OutputWrapper(raw) +} + +// OutputWrapper allows access to field from an sdk.CapDefinition[Output] +func OutputWrapper(raw sdk.CapDefinition[Output]) OutputCap { + wrapped, ok := raw.(OutputCap) + if ok { + return wrapped + } + return &outputCap{CapDefinition: raw} +} + +type OutputCap interface { + sdk.CapDefinition[Output] + LatestValue() sdk.CapDefinition[any] + private() +} + +type outputCap struct { + sdk.CapDefinition[Output] +} + +func (*outputCap) private() {} +func (c *outputCap) LatestValue() sdk.CapDefinition[any] { + return sdk.AccessField[Output, any](c.CapDefinition, "LatestValue") +} + +func ConstantOutput(value Output) OutputCap { + return &outputCap{CapDefinition: sdk.ConstantDefinition(value)} +} + +func NewOutputFromFields( + latestValue sdk.CapDefinition[any]) OutputCap { + return &simpleOutput{ + CapDefinition: sdk.ComponentCapDefinition[Output]{ + "LatestValue": latestValue.Ref(), + }, + latestValue: latestValue, + } +} + +type simpleOutput struct { + sdk.CapDefinition[Output] + latestValue sdk.CapDefinition[any] +} + +func (c *simpleOutput) LatestValue() sdk.CapDefinition[any] { + return c.latestValue +} + +func (c *simpleOutput) private() {} + +type ActionInput struct { + ConfidenceLevel sdk.CapDefinition[string] + Params sdk.CapDefinition[InputParams] +} + +func (input ActionInput) ToSteps() sdk.StepInputs { + return sdk.StepInputs{ + Mapping: map[string]any{ + "ConfidenceLevel": input.ConfidenceLevel.Ref(), + "Params": input.Params.Ref(), + }, + } +} diff --git a/pkg/capabilities/actions/readcontract/readcontract_action-schema.json b/pkg/capabilities/actions/readcontract/readcontract_action-schema.json new file mode 100644 index 0000000000..de94037d6b --- /dev/null +++ b/pkg/capabilities/actions/readcontract/readcontract_action-schema.json @@ -0,0 +1,58 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/smartcontractkit/chainlink-common/pkg/capabilities/actions/readcontract/read-contract-action@1.0.0", + "$defs": { + "Config": { + "type": "object", + "properties": { + "ContractReaderConfig": { + "type": "string" + }, + "ReadIdentifier": { + "type": "string" + }, + "ContractAddress": { + "type": "string" + }, + "ContractName": { + "type": "string" + } + }, + "required": ["ContractReaderConfig", "ReadIdentifier", "ContractAddress", "ContractName"] + }, + "Input": { + "type": "object", + "properties": { + "ConfidenceLevel": { + "type": "string" + }, + "Params": { + "type": "object", + "additionalProperties": true + } + }, + "required": ["ConfidenceLevel", "Params"] + }, + "Output": { + "type": "object", + "properties": { + "LatestValue": { + "type": ["object", "string", "boolean", "null", "array"] + } + }, + "required": ["LatestValue"] + } + }, + "type": "object", + "properties": { + "Config": { + "$ref": "#/$defs/Config" + }, + "Inputs": { + "$ref": "#/$defs/Input" + }, + "Outputs": { + "$ref": "#/$defs/Output" + } + } +} \ No newline at end of file diff --git a/pkg/capabilities/actions/readcontract/readcontract_action_generated.go b/pkg/capabilities/actions/readcontract/readcontract_action_generated.go new file mode 100644 index 0000000000..8f19f8da43 --- /dev/null +++ b/pkg/capabilities/actions/readcontract/readcontract_action_generated.go @@ -0,0 +1,115 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package readcontract + +import ( + "encoding/json" + "fmt" +) + +type Action struct { + // Config corresponds to the JSON schema field "Config". + Config *Config `json:"Config,omitempty" yaml:"Config,omitempty" mapstructure:"Config,omitempty"` + + // Inputs corresponds to the JSON schema field "Inputs". + Inputs *Input `json:"Inputs,omitempty" yaml:"Inputs,omitempty" mapstructure:"Inputs,omitempty"` + + // Outputs corresponds to the JSON schema field "Outputs". + Outputs *Output `json:"Outputs,omitempty" yaml:"Outputs,omitempty" mapstructure:"Outputs,omitempty"` +} + +type Config struct { + // ContractAddress corresponds to the JSON schema field "ContractAddress". + ContractAddress string `json:"ContractAddress" yaml:"ContractAddress" mapstructure:"ContractAddress"` + + // ContractName corresponds to the JSON schema field "ContractName". + ContractName string `json:"ContractName" yaml:"ContractName" mapstructure:"ContractName"` + + // ContractReaderConfig corresponds to the JSON schema field + // "ContractReaderConfig". + ContractReaderConfig string `json:"ContractReaderConfig" yaml:"ContractReaderConfig" mapstructure:"ContractReaderConfig"` + + // ReadIdentifier corresponds to the JSON schema field "ReadIdentifier". + ReadIdentifier string `json:"ReadIdentifier" yaml:"ReadIdentifier" mapstructure:"ReadIdentifier"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Config) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["ContractAddress"]; raw != nil && !ok { + return fmt.Errorf("field ContractAddress in Config: required") + } + if _, ok := raw["ContractName"]; raw != nil && !ok { + return fmt.Errorf("field ContractName in Config: required") + } + if _, ok := raw["ContractReaderConfig"]; raw != nil && !ok { + return fmt.Errorf("field ContractReaderConfig in Config: required") + } + if _, ok := raw["ReadIdentifier"]; raw != nil && !ok { + return fmt.Errorf("field ReadIdentifier in Config: required") + } + type Plain Config + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Config(plain) + return nil +} + +type Input struct { + // ConfidenceLevel corresponds to the JSON schema field "ConfidenceLevel". + ConfidenceLevel string `json:"ConfidenceLevel" yaml:"ConfidenceLevel" mapstructure:"ConfidenceLevel"` + + // Params corresponds to the JSON schema field "Params". + Params InputParams `json:"Params" yaml:"Params" mapstructure:"Params"` +} + +type InputParams map[string]interface{} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Input) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["ConfidenceLevel"]; raw != nil && !ok { + return fmt.Errorf("field ConfidenceLevel in Input: required") + } + if _, ok := raw["Params"]; raw != nil && !ok { + return fmt.Errorf("field Params in Input: required") + } + type Plain Input + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Input(plain) + return nil +} + +type Output struct { + // LatestValue corresponds to the JSON schema field "LatestValue". + LatestValue interface{} `json:"LatestValue" yaml:"LatestValue" mapstructure:"LatestValue"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Output) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["LatestValue"]; raw != nil && !ok { + return fmt.Errorf("field LatestValue in Output: required") + } + type Plain Output + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Output(plain) + return nil +} diff --git a/pkg/capabilities/actions/readcontract/readcontracttest/action_mock_generated.go b/pkg/capabilities/actions/readcontract/readcontracttest/action_mock_generated.go new file mode 100644 index 0000000000..ca5f1b321f --- /dev/null +++ b/pkg/capabilities/actions/readcontract/readcontracttest/action_mock_generated.go @@ -0,0 +1,27 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package readcontracttest + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/actions/readcontract" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk/testutils" +) + +// Action registers a new capability mock with the runner +// if another mock is registered for the same capability with for a step, it will take priority for that step. +func Action(runner *testutils.Runner, fn func(input readcontract.Input) (readcontract.Output, error)) *testutils.Mock[readcontract.Input, readcontract.Output] { + mock := testutils.MockCapability[readcontract.Input, readcontract.Output]("read-contract-action@1.0.0", fn) + runner.MockCapability("read-contract-action@1.0.0", nil, mock) + return mock +} + +// ActionForStep registers a new capability mock with the runner, but only for a given step. +// if another mock was registered for the same capability without a step, this mock will take priority for that step. +func ActionForStep(runner *testutils.Runner, step string, mockFn func(input readcontract.Input) (readcontract.Output, error)) *testutils.Mock[readcontract.Input, readcontract.Output] { + fn := mockFn + mock := testutils.MockCapability[readcontract.Input, readcontract.Output]("read-contract-action@1.0.0", fn) + runner.MockCapability("read-contract-action@1.0.0", &step, mock) + return mock +} diff --git a/pkg/capabilities/triggers/cron/cron_trigger-schema.json b/pkg/capabilities/triggers/cron/cron_trigger-schema.json new file mode 100644 index 0000000000..5710378a43 --- /dev/null +++ b/pkg/capabilities/triggers/cron/cron_trigger-schema.json @@ -0,0 +1,43 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/cron/cron-trigger@1.0.0", + "$defs": { + "Payload": { + "type": "object", + "properties": { + "ActualExecutionTime": { + "type": "string", + "description": "Time that cron trigger's task execution occurred (RFC3339Nano formatted)" + }, + "ScheduledExecutionTime": { + "type": "string", + "description": "Time that cron trigger's task execution had been scheduled to occur (RFC3339Nano formatted)" + } + }, + "required": ["ActualExecutionTime", "ScheduledExecutionTime"], + "additionalProperties": false + }, + "Config": { + "type": "object", + "properties": { + "schedule": { + "type": "string" + } + }, + "required": ["schedule"], + "additionalProperties": false + } + }, + "type": "object", + "properties": { + "config": { + "$ref": "#/$defs/Config" + }, + "outputs": { + "$ref": "#/$defs/Payload" + } + }, + "required": ["config", "outputs"], + "additionalProperties": false, + "description": "A trigger that uses a cron schedule to run periodically at fixed times, dates, or intervals." +} \ No newline at end of file diff --git a/pkg/capabilities/triggers/cron/cron_trigger_generated.go b/pkg/capabilities/triggers/cron/cron_trigger_generated.go new file mode 100644 index 0000000000..5c2e2a5c06 --- /dev/null +++ b/pkg/capabilities/triggers/cron/cron_trigger_generated.go @@ -0,0 +1,92 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package cron + +import ( + "encoding/json" + "fmt" +) + +type Config struct { + // Schedule corresponds to the JSON schema field "schedule". + Schedule string `json:"schedule" yaml:"schedule" mapstructure:"schedule"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Config) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["schedule"]; raw != nil && !ok { + return fmt.Errorf("field schedule in Config: required") + } + type Plain Config + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Config(plain) + return nil +} + +type Payload struct { + // Time that cron trigger's task execution occurred (RFC3339Nano formatted) + ActualExecutionTime string `json:"ActualExecutionTime" yaml:"ActualExecutionTime" mapstructure:"ActualExecutionTime"` + + // Time that cron trigger's task execution had been scheduled to occur + // (RFC3339Nano formatted) + ScheduledExecutionTime string `json:"ScheduledExecutionTime" yaml:"ScheduledExecutionTime" mapstructure:"ScheduledExecutionTime"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Payload) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["ActualExecutionTime"]; raw != nil && !ok { + return fmt.Errorf("field ActualExecutionTime in Payload: required") + } + if _, ok := raw["ScheduledExecutionTime"]; raw != nil && !ok { + return fmt.Errorf("field ScheduledExecutionTime in Payload: required") + } + type Plain Payload + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Payload(plain) + return nil +} + +// A trigger that uses a cron schedule to run periodically at fixed times, dates, +// or intervals. +type Trigger struct { + // Config corresponds to the JSON schema field "config". + Config Config `json:"config" yaml:"config" mapstructure:"config"` + + // Outputs corresponds to the JSON schema field "outputs". + Outputs Payload `json:"outputs" yaml:"outputs" mapstructure:"outputs"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Trigger) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["config"]; raw != nil && !ok { + return fmt.Errorf("field config in Trigger: required") + } + if _, ok := raw["outputs"]; raw != nil && !ok { + return fmt.Errorf("field outputs in Trigger: required") + } + type Plain Trigger + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Trigger(plain) + return nil +} diff --git a/pkg/capabilities/triggers/cron/crontest/trigger_mock_generated.go b/pkg/capabilities/triggers/cron/crontest/trigger_mock_generated.go new file mode 100644 index 0000000000..ad683b1d52 --- /dev/null +++ b/pkg/capabilities/triggers/cron/crontest/trigger_mock_generated.go @@ -0,0 +1,17 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package crontest + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/cron" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk/testutils" +) + +// Trigger registers a new capability mock with the runner +func Trigger(runner *testutils.Runner, fn func() (cron.Payload, error)) *testutils.TriggerMock[cron.Payload] { + mock := testutils.MockTrigger[cron.Payload]("cron-trigger@1.0.0", fn) + runner.MockCapability("cron-trigger@1.0.0", nil, mock) + return mock +} diff --git a/pkg/capabilities/triggers/cron/trigger_builders_generated.go b/pkg/capabilities/triggers/cron/trigger_builders_generated.go new file mode 100644 index 0000000000..e84dac1764 --- /dev/null +++ b/pkg/capabilities/triggers/cron/trigger_builders_generated.go @@ -0,0 +1,84 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package cron + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" +) + +func (cfg Config) New(w *sdk.WorkflowSpecFactory) PayloadCap { + ref := "trigger" + def := sdk.StepDefinition{ + ID: "cron-trigger@1.0.0", Ref: ref, + Inputs: sdk.StepInputs{}, + Config: map[string]any{ + "schedule": cfg.Schedule, + }, + CapabilityType: capabilities.CapabilityTypeTrigger, + } + + step := sdk.Step[Payload]{Definition: def} + raw := step.AddTo(w) + return PayloadWrapper(raw) +} + +// PayloadWrapper allows access to field from an sdk.CapDefinition[Payload] +func PayloadWrapper(raw sdk.CapDefinition[Payload]) PayloadCap { + wrapped, ok := raw.(PayloadCap) + if ok { + return wrapped + } + return &payloadCap{CapDefinition: raw} +} + +type PayloadCap interface { + sdk.CapDefinition[Payload] + ActualExecutionTime() sdk.CapDefinition[string] + ScheduledExecutionTime() sdk.CapDefinition[string] + private() +} + +type payloadCap struct { + sdk.CapDefinition[Payload] +} + +func (*payloadCap) private() {} +func (c *payloadCap) ActualExecutionTime() sdk.CapDefinition[string] { + return sdk.AccessField[Payload, string](c.CapDefinition, "ActualExecutionTime") +} +func (c *payloadCap) ScheduledExecutionTime() sdk.CapDefinition[string] { + return sdk.AccessField[Payload, string](c.CapDefinition, "ScheduledExecutionTime") +} + +func ConstantPayload(value Payload) PayloadCap { + return &payloadCap{CapDefinition: sdk.ConstantDefinition(value)} +} + +func NewPayloadFromFields( + actualExecutionTime sdk.CapDefinition[string], + scheduledExecutionTime sdk.CapDefinition[string]) PayloadCap { + return &simplePayload{ + CapDefinition: sdk.ComponentCapDefinition[Payload]{ + "ActualExecutionTime": actualExecutionTime.Ref(), + "ScheduledExecutionTime": scheduledExecutionTime.Ref(), + }, + actualExecutionTime: actualExecutionTime, + scheduledExecutionTime: scheduledExecutionTime, + } +} + +type simplePayload struct { + sdk.CapDefinition[Payload] + actualExecutionTime sdk.CapDefinition[string] + scheduledExecutionTime sdk.CapDefinition[string] +} + +func (c *simplePayload) ActualExecutionTime() sdk.CapDefinition[string] { + return c.actualExecutionTime +} +func (c *simplePayload) ScheduledExecutionTime() sdk.CapDefinition[string] { + return c.scheduledExecutionTime +} + +func (c *simplePayload) private() {} From 0b03fa331a49577ad30b8b780e0bc8070bd58328 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Wed, 11 Dec 2024 20:22:25 +0100 Subject: [PATCH 2/5] BCFR-1086 finality violation (#966) * define finality violation error Signed-off-by: Dmytro Haidashenko * rename finality violation Signed-off-by: Dmytro Haidashenko * Test ContainsError Signed-off-by: Dmytro Haidashenko --------- Signed-off-by: Dmytro Haidashenko Co-authored-by: Domino Valdano --- pkg/services/health.go | 11 +++++++ pkg/services/health_test.go | 58 ++++++++++++++++++++++++++++++++++++ pkg/types/contract_reader.go | 8 +++++ 3 files changed, 77 insertions(+) create mode 100644 pkg/services/health_test.go diff --git a/pkg/services/health.go b/pkg/services/health.go index 7bdfb5113a..7108e53b61 100644 --- a/pkg/services/health.go +++ b/pkg/services/health.go @@ -257,3 +257,14 @@ func (c *HealthChecker) IsHealthy() (healthy bool, errors map[string]error) { return } + +// ContainsError - returns true if report contains targetErr +func ContainsError(report map[string]error, targetErr error) bool { + for _, err := range report { + if errors.Is(err, targetErr) { + return true + } + } + + return false +} diff --git a/pkg/services/health_test.go b/pkg/services/health_test.go new file mode 100644 index 0000000000..325d2cf20b --- /dev/null +++ b/pkg/services/health_test.go @@ -0,0 +1,58 @@ +package services + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestContainsError(t *testing.T) { + anError := errors.New("an error") + anotherError := errors.New("another error") + testCases := []struct { + Name string + Report map[string]error + Target error + ExpectedResult bool + }{ + { + Name: "nil map", + Report: nil, + Target: anError, + ExpectedResult: false, + }, + { + Name: "report contains service, but it's healthy", + Report: map[string]error{"service": nil}, + Target: anError, + ExpectedResult: false, + }, + { + Name: "service is not healthy, but it's not caused by target error", + Report: map[string]error{"service": anotherError}, + Target: anError, + ExpectedResult: false, + }, + { + Name: "service is not healthy and contains wrapper target", + Report: map[string]error{"service": fmt.Errorf("wrapped error: %w", anError)}, + Target: anError, + ExpectedResult: true, + }, + { + Name: "service is not healthy due to multiple errors including target", + Report: map[string]error{"service": errors.Join(anError, anotherError)}, + Target: anError, + ExpectedResult: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + actualResult := ContainsError(tc.Report, tc.Target) + assert.Equal(t, tc.ExpectedResult, actualResult) + }) + } +} diff --git a/pkg/types/contract_reader.go b/pkg/types/contract_reader.go index 206e77ae46..5d317de776 100644 --- a/pkg/types/contract_reader.go +++ b/pkg/types/contract_reader.go @@ -16,6 +16,7 @@ const ( ErrContractReaderConfigMissing = UnimplementedError("ContractReader entry missing from RelayConfig") ErrInternal = InternalError("internal error") ErrNotFound = NotFoundError("not found") + ErrFinalityViolated = InternalError("finality violated") ) // ContractReader defines essential read operations a chain should implement for reading contract values and events. @@ -70,6 +71,13 @@ type ContractReader interface { // The iterator returns a pair of key and sequence. QueryKeys(ctx context.Context, filters []ContractKeyFilter, limitAndSort query.LimitAndSort) (iter.Seq2[string, Sequence], error) + // HealthReport returns a full health report of the callee including its dependencies. + // Keys are based on Name(), with nil values when healthy or errors otherwise. + // Use CopyHealth to collect reports from sub-services. + // This should run very fast, so avoid doing computation and instead prefer reporting pre-calculated state. + // On finality violation report must contain at least one ErrFinalityViolation. + HealthReport() map[string]error + mustEmbedUnimplementedContractReader() } From 525a5610c8775f1566802ddec651f1383e155df1 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Thu, 12 Dec 2024 08:22:05 -0800 Subject: [PATCH 3/5] [CAPPL] Add mode quorum configuration option to Reduce Aggregator (#972) * Add 'majority' aggregation method to Reduce Aggregator * (refactor): Change implementation to 'ModeQuorum' * Only fill modeQuorum for method mode --- .../ocr3/aggregators/reduce_aggregator.go | 55 ++++++++++++++++--- .../consensus/ocr3/aggregators/reduce_test.go | 49 +++++++++++++++++ 2 files changed, 95 insertions(+), 9 deletions(-) diff --git a/pkg/capabilities/consensus/ocr3/aggregators/reduce_aggregator.go b/pkg/capabilities/consensus/ocr3/aggregators/reduce_aggregator.go index 2a3fb20205..7b331d37e5 100644 --- a/pkg/capabilities/consensus/ocr3/aggregators/reduce_aggregator.go +++ b/pkg/capabilities/consensus/ocr3/aggregators/reduce_aggregator.go @@ -31,14 +31,17 @@ const ( DEVIATION_TYPE_ANY = "any" // DEVIATION_TYPE_PERCENT is a numeric percentage difference DEVIATION_TYPE_PERCENT = "percent" - // DEVIATION_TYPE_ABSOLUTE is a numeric absolute difference + // DEVIATION_TYPE_ABSOLUTE is a numeric unsigned difference DEVIATION_TYPE_ABSOLUTE = "absolute" REPORT_FORMAT_MAP = "map" REPORT_FORMAT_ARRAY = "array" REPORT_FORMAT_VALUE = "value" + MODE_QUORUM_OCR = "ocr" + MODE_QUORUM_ANY = "any" DEFAULT_REPORT_FORMAT = REPORT_FORMAT_MAP DEFAULT_OUTPUT_FIELD_NAME = "Reports" + DEFAULT_MODE_QUORUM = MODE_QUORUM_ANY ) type ReduceAggConfig struct { @@ -70,8 +73,12 @@ type AggregationField struct { InputKey string `mapstructure:"inputKey" json:"inputKey"` // How the data set should be aggregated to a single value // * median - take the centermost value of the sorted data set of observations. can only be used on numeric types. not a true median, because no average if two middle values. - // * mode - take the most frequent value. if tied, use the "first". + // * mode - take the most frequent value. if tied, use the "first". use "ModeQuorom" to configure the minimum number of seen values. Method string `mapstructure:"method" json:"method" jsonschema:"enum=median,enum=mode" required:"true"` + // When using Method=mode, this will configure the minimum number of values that must be seen + // * ocr - (default) enforces that the number of matching values must be at least f+1, otherwise consensus fails + // * any - do not enforce any limit on the minimum viable count. this may result in unexpected answers if every observation is unique. + ModeQuorum string `mapstructure:"modeQuorum" json:"modeQuorum,omitempty" jsonschema:"enum=ocr,enum=any" default:"ocr"` // The key that the aggregated data is put under // If omitted, the InputKey will be used OutputKey string `mapstructure:"outputKey" json:"outputKey"` @@ -108,7 +115,7 @@ func (a *reduceAggregator) Aggregate(lggr logger.Logger, previousOutcome *types. return nil, fmt.Errorf("not enough observations provided %s, have %d want %d", field.InputKey, len(vals), 2*f+1) } - singleValue, err := reduce(field.Method, vals) + singleValue, err := reduce(field.Method, vals, f, field.ModeQuorum) if err != nil { return nil, fmt.Errorf("unable to reduce on method %s, err: %s", field.Method, err.Error()) } @@ -335,12 +342,20 @@ func (a *reduceAggregator) extractValues(lggr logger.Logger, observations map[oc return vals } -func reduce(method string, items []values.Value) (values.Value, error) { +func reduce(method string, items []values.Value, f int, modeQuorum string) (values.Value, error) { switch method { case AGGREGATION_METHOD_MEDIAN: return median(items) case AGGREGATION_METHOD_MODE: - return mode(items) + value, count, err := mode(items) + if err != nil { + return value, err + } + err = modeHasQuorum(modeQuorum, count, f) + if err != nil { + return value, err + } + return value, err default: // invariant, config should be validated return nil, fmt.Errorf("unsupported aggregation method %s", method) @@ -408,10 +423,10 @@ func toDecimal(item values.Value) (decimal.Decimal, error) { } } -func mode(items []values.Value) (values.Value, error) { +func mode(items []values.Value) (values.Value, int, error) { if len(items) == 0 { // invariant, as long as f > 0 there should be items - return nil, errors.New("items cannot be empty") + return nil, 0, errors.New("items cannot be empty") } counts := make(map[[32]byte]*counter) @@ -419,7 +434,7 @@ func mode(items []values.Value) (values.Value, error) { marshalled, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(item)) if err != nil { // invariant: values should always be able to be proto marshalled - return nil, err + return nil, 0, err } sha := sha256.Sum256(marshalled) elem, ok := counts[sha] @@ -449,7 +464,22 @@ func mode(items []values.Value) (values.Value, error) { // If more than one mode found, choose first - return modes[0], nil + return modes[0], maxCount, nil +} + +func modeHasQuorum(quorumType string, count int, f int) error { + switch quorumType { + case MODE_QUORUM_ANY: + return nil + case MODE_QUORUM_OCR: + if count < f+1 { + return fmt.Errorf("mode quorum not reached. have: %d, want: %d", count, f+1) + } + return nil + default: + // invariant, config should be validated + return fmt.Errorf("unsupported mode quorum %s", quorumType) + } } func deviation(method string, previousValue values.Value, nextValue values.Value) (decimal.Decimal, error) { @@ -561,6 +591,13 @@ func ParseConfigReduceAggregator(config values.Map) (ReduceAggConfig, error) { if len(field.Method) == 0 || !isOneOf(field.Method, []string{AGGREGATION_METHOD_MEDIAN, AGGREGATION_METHOD_MODE}) { return ReduceAggConfig{}, fmt.Errorf("aggregation field must contain a method. options: [%s, %s]", AGGREGATION_METHOD_MEDIAN, AGGREGATION_METHOD_MODE) } + if field.Method == AGGREGATION_METHOD_MODE && len(field.ModeQuorum) == 0 { + field.ModeQuorum = MODE_QUORUM_OCR + parsedConfig.Fields[i].ModeQuorum = MODE_QUORUM_OCR + } + if field.Method == AGGREGATION_METHOD_MODE && !isOneOf(field.ModeQuorum, []string{MODE_QUORUM_ANY, MODE_QUORUM_OCR}) { + return ReduceAggConfig{}, fmt.Errorf("mode quorum must be one of options: [%s, %s]", MODE_QUORUM_ANY, MODE_QUORUM_OCR) + } if len(field.DeviationString) > 0 && isOneOf(field.DeviationType, []string{DEVIATION_TYPE_NONE, DEVIATION_TYPE_ANY}) { return ReduceAggConfig{}, fmt.Errorf("aggregation field cannot have deviation with a deviation type of %s", field.DeviationType) } diff --git a/pkg/capabilities/consensus/ocr3/aggregators/reduce_test.go b/pkg/capabilities/consensus/ocr3/aggregators/reduce_test.go index f683ab105d..19d254191d 100644 --- a/pkg/capabilities/consensus/ocr3/aggregators/reduce_test.go +++ b/pkg/capabilities/consensus/ocr3/aggregators/reduce_test.go @@ -634,6 +634,25 @@ func TestReduceAggregator_Aggregate(t *testing.T) { return map[commontypes.OracleID][]values.Value{1: {mockValue}, 2: {mockValue}, 3: {mockValue}} }, }, + { + name: "reduce error mode with mode quorum of ocr", + previousOutcome: nil, + fields: []aggregators.AggregationField{ + { + Method: "mode", + ModeQuorum: "ocr", + OutputKey: "Price", + }, + }, + extraConfig: map[string]any{}, + observationsFactory: func() map[commontypes.OracleID][]values.Value { + mockValue, err := values.Wrap(true) + require.NoError(t, err) + mockValue2, err := values.Wrap(true) + require.NoError(t, err) + return map[commontypes.OracleID][]values.Value{1: {mockValue}, 2: {mockValue2}} + }, + }, } for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { @@ -825,6 +844,7 @@ func TestMedianAggregator_ParseConfig(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedId", Method: "mode", + ModeQuorum: "ocr", DeviationString: "1.1", Deviation: decimal.NewFromFloat(1.1), DeviationType: "absolute", @@ -1153,6 +1173,23 @@ func TestMedianAggregator_ParseConfig(t *testing.T) { return vMap }, }, + { + name: "invalid mode quorum", + configFactory: func() *values.Map { + vMap, err := values.NewMap(map[string]any{ + "fields": []aggregators.AggregationField{ + { + InputKey: "Price", + Method: "mode", + ModeQuorum: "invalid", + OutputKey: "Price", + }, + }, + }) + require.NoError(t, err) + return vMap + }, + }, } for _, tt := range cases { @@ -1233,6 +1270,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedID", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1278,6 +1316,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "BoolField", OutputKey: "BoolField", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1323,6 +1362,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedID", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1368,6 +1408,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "BoolField", OutputKey: "BoolField", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1413,6 +1454,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedID", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1458,6 +1500,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedID", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1503,6 +1546,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedID", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1548,6 +1592,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedID", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1593,6 +1638,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedID", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1638,6 +1684,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedID", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1683,6 +1730,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedID", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { @@ -1728,6 +1776,7 @@ func TestAggregateShouldReport(t *testing.T) { InputKey: "FeedID", OutputKey: "FeedID", Method: "mode", + ModeQuorum: "any", DeviationType: "any", }, { From 6a43e61b9d4990e98ca80a8155cfa5287c5d67b6 Mon Sep 17 00:00:00 2001 From: Cedric Date: Thu, 12 Dec 2024 16:39:58 +0000 Subject: [PATCH 4/5] [CAPPL-366/CAPPL-382] Miscellaneous fixes (#973) * [CAPPL-382] Normalize owner before comparing * [CAPPL-366] Add name to hash to generate workflowID --- pkg/workflows/secrets/secrets.go | 13 ++++++++++++- pkg/workflows/secrets/secrets_test.go | 13 ++++++++++++- pkg/workflows/utils.go | 22 +++++++++++++++++----- pkg/workflows/utils_test.go | 11 +++++++---- 4 files changed, 48 insertions(+), 11 deletions(-) diff --git a/pkg/workflows/secrets/secrets.go b/pkg/workflows/secrets/secrets.go index 443e2821a8..830512eba2 100644 --- a/pkg/workflows/secrets/secrets.go +++ b/pkg/workflows/secrets/secrets.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "strings" "golang.org/x/crypto/nacl/box" ) @@ -146,13 +147,23 @@ func DecryptSecretsForNode( return nil, err } - if payload.WorkflowOwner != workflowOwner { + if normalizeOwner(payload.WorkflowOwner) != normalizeOwner(workflowOwner) { return nil, fmt.Errorf("invalid secrets bundle: got owner %s, expected %s", payload.WorkflowOwner, workflowOwner) } return payload.Secrets, nil } +func normalizeOwner(owner string) string { + o := owner + if strings.HasPrefix(o, "0x") { + o = o[2:] + } + + o = strings.ToLower(o) + return o +} + func ValidateEncryptedSecrets(secretsData []byte, encryptionPublicKeys map[string][32]byte, workflowOwner string) error { var encryptedSecrets EncryptedSecretsResult err := json.Unmarshal(secretsData, &encryptedSecrets) diff --git a/pkg/workflows/secrets/secrets_test.go b/pkg/workflows/secrets/secrets_test.go index cf192b5b0c..bf346b69cf 100644 --- a/pkg/workflows/secrets/secrets_test.go +++ b/pkg/workflows/secrets/secrets_test.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "errors" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -31,7 +32,7 @@ var ( "SECRET_A": {"one", "two", "three", "four"}, "SECRET_B": {"all"}, } - workflowOwner = "0x9ed925d8206a4f88a2f643b28b3035b315753cd6" + workflowOwner = "0xFbb30BD8E9D779044c3c30dd82e52a5FA1573388" config = SecretsConfig{ SecretsNames: map[string][]string{ "SECRET_A": {"ENV_VAR_A_FOR_NODE_ONE", "ENV_VAR_A_FOR_NODE_TWO", "ENV_VAR_A_FOR_NODE_THREE", "ENV_VAR_A_FOR_NODE_FOUR"}, @@ -162,6 +163,16 @@ func TestEncryptDecrypt(t *testing.T) { assert.ErrorContains(t, err, "invalid secrets bundle: got owner") }) + t.Run("owner without 0x prefix", func(st *testing.T) { + _, err = DecryptSecretsForNode(result, k, workflowOwner[2:]) + require.NoError(t, err) + }) + + t.Run("owner with lower casing", func(st *testing.T) { + _, err = DecryptSecretsForNode(result, k, strings.ToLower(workflowOwner)) + require.NoError(t, err) + }) + t.Run("key not in metadata", func(st *testing.T) { overriddenResult := EncryptedSecretsResult{ EncryptedSecrets: encryptedSecrets, diff --git a/pkg/workflows/utils.go b/pkg/workflows/utils.go index d7aae90b0c..250912c63e 100644 --- a/pkg/workflows/utils.go +++ b/pkg/workflows/utils.go @@ -21,7 +21,7 @@ func EncodeExecutionID(workflowID, eventID string) (string, error) { return hex.EncodeToString(s.Sum(nil)), nil } -func GenerateWorkflowIDFromStrings(owner string, workflow []byte, config []byte, secretsURL string) (string, error) { +func GenerateWorkflowIDFromStrings(owner string, name string, workflow []byte, config []byte, secretsURL string) (string, error) { ownerWithoutPrefix := owner if strings.HasPrefix(owner, "0x") { ownerWithoutPrefix = owner[2:] @@ -32,7 +32,7 @@ func GenerateWorkflowIDFromStrings(owner string, workflow []byte, config []byte, return "", err } - wid, err := GenerateWorkflowID(ownerb, workflow, config, secretsURL) + wid, err := GenerateWorkflowID(ownerb, name, workflow, config, secretsURL) if err != nil { return "", err } @@ -40,13 +40,21 @@ func GenerateWorkflowIDFromStrings(owner string, workflow []byte, config []byte, return hex.EncodeToString(wid[:]), nil } -func GenerateWorkflowID(owner []byte, workflow []byte, config []byte, secretsURL string) ([32]byte, error) { +var ( + versionByte = byte(0) +) + +func GenerateWorkflowID(owner []byte, name string, workflow []byte, config []byte, secretsURL string) ([32]byte, error) { s := sha256.New() _, err := s.Write(owner) if err != nil { return [32]byte{}, err } - _, err = s.Write([]byte(workflow)) + _, err = s.Write([]byte(name)) + if err != nil { + return [32]byte{}, err + } + _, err = s.Write(workflow) if err != nil { return [32]byte{}, err } @@ -58,5 +66,9 @@ func GenerateWorkflowID(owner []byte, workflow []byte, config []byte, secretsURL if err != nil { return [32]byte{}, err } - return [32]byte(s.Sum(nil)), nil + + sha := [32]byte(s.Sum(nil)) + sha[0] = versionByte + + return sha, nil } diff --git a/pkg/workflows/utils_test.go b/pkg/workflows/utils_test.go index 1fccc6839c..477807733b 100644 --- a/pkg/workflows/utils_test.go +++ b/pkg/workflows/utils_test.go @@ -43,23 +43,26 @@ func Test_EncodeExecutionID(t *testing.T) { func Test_GenerateWorkflowIDFromStrings(t *testing.T) { // With prefix owner := "0x26729408f179371be6433b9585d8427f121bfe82" - got, err := GenerateWorkflowIDFromStrings(owner, []byte("workflow"), []byte("config"), "http://mysecrets.com") + got, err := GenerateWorkflowIDFromStrings(owner, "porporpore", []byte("workflow"), []byte("config"), "http://mysecrets.com") require.NoError(t, err) assert.NotNil(t, got) + // Always starts with the version byte + assert.Equal(t, got[:2], hex.EncodeToString([]byte{versionByte})) + // Without prefix owner = "26729408f179371be6433b9585d8427f121bfe82" - got, err = GenerateWorkflowIDFromStrings(owner, []byte("workflow"), []byte("config"), "http://mysecrets.com") + got, err = GenerateWorkflowIDFromStrings(owner, "porporpore", []byte("workflow"), []byte("config"), "http://mysecrets.com") require.NoError(t, err) assert.NotNil(t, got) // Very short; empty but with a prefix owner = "0x" - got, err = GenerateWorkflowIDFromStrings(owner, []byte("workflow"), []byte("config"), "http://mysecrets.com") + got, err = GenerateWorkflowIDFromStrings(owner, "porporpore", []byte("workflow"), []byte("config"), "http://mysecrets.com") require.NoError(t, err) assert.NotNil(t, got) owner = "invalid" - _, err = GenerateWorkflowIDFromStrings(owner, []byte("workflow"), []byte("config"), "http://mysecrets.com") + _, err = GenerateWorkflowIDFromStrings(owner, "porporpore", []byte("workflow"), []byte("config"), "http://mysecrets.com") assert.ErrorContains(t, err, "encoding/hex") } From dbebc0fc753a6cb6955fb08e9d2f53d8e401ed24 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Fri, 13 Dec 2024 09:49:39 -0800 Subject: [PATCH 5/5] (feat): Add PreCodec modifier (#961) --- pkg/codec/config.go | 67 +++++++ pkg/codec/precodec.go | 113 +++++++++++ pkg/codec/precodec_test.go | 393 +++++++++++++++++++++++++++++++++++++ 3 files changed, 573 insertions(+) create mode 100644 pkg/codec/precodec.go create mode 100644 pkg/codec/precodec_test.go diff --git a/pkg/codec/config.go b/pkg/codec/config.go index 92c79ceaa1..21b6cca041 100644 --- a/pkg/codec/config.go +++ b/pkg/codec/config.go @@ -24,6 +24,7 @@ import ( // - epoch to time -> [EpochToTimeModifierConfig] // - address to string -> [AddressBytesToStringModifierConfig] // - field wrapper -> [WrapperModifierConfig] +// - precodec -> [PrecodecModifierConfig] type ModifiersConfig []ModifierConfig func (m *ModifiersConfig) UnmarshalJSON(data []byte) error { @@ -58,6 +59,8 @@ func (m *ModifiersConfig) UnmarshalJSON(data []byte) error { (*m)[i] = &AddressBytesToStringModifierConfig{} case ModifierWrapper: (*m)[i] = &WrapperModifierConfig{} + case ModifierPreCodec: + (*m)[i] = &PreCodecModifierConfig{} default: return fmt.Errorf("%w: unknown modifier type: %s", types.ErrInvalidConfig, mType) } @@ -84,6 +87,7 @@ func (m *ModifiersConfig) ToModifier(onChainHooks ...mapstructure.DecodeHookFunc type ModifierType string const ( + ModifierPreCodec ModifierType = "precodec" ModifierRename ModifierType = "rename" ModifierDrop ModifierType = "drop" ModifierHardCode ModifierType = "hard code" @@ -199,6 +203,69 @@ func (h *HardCodeModifierConfig) MarshalJSON() ([]byte, error) { }) } +// PreCodec creates a modifier that will transform data using a preliminary encoding/decoding step. +// 'Off-chain' values will be overwritten with the encoded data as a byte array. +// 'On-chain' values will be typed using the optimistic types from the codec. +// This is useful when wanting to move the data as generic bytes. +// +// Example: +// +// Based on this input struct: +// type example struct { +// A []B +// } +// +// type B struct { +// C string +// D string +// } +// +// And the fields config defined as: +// {"A": "string C, string D"} +// +// The codec config gives a map of strings (the values from fields config map) to implementation for encoding/decoding +// +// RemoteCodec { +// func (types.TypeProvider) CreateType(itemType string, forEncoding bool) (any, error) +// func (types.Decoder) Decode(ctx context.Context, raw []byte, into any, itemType string) error +// func (types.Encoder) Encode(ctx context.Context, item any, itemType string) ([]byte, error) +// func (types.Decoder) GetMaxDecodingSize(ctx context.Context, n int, itemType string) (int, error) +// func (types.Encoder) GetMaxEncodingSize(ctx context.Context, n int, itemType string) (int, error) +// } +// +// {"string C, string D": RemoteCodec} +// +// Result: +// type example struct { +// A [][]bytes +// } +// +// Where []bytes are the encoded input struct B +type PreCodecModifierConfig struct { + // A map of a path of properties to encoding scheme. + // If the path leads to an array, encoding will occur on every entry. + // + // Example: "a.b" -> "uint256 Value" + Fields map[string]string + // Codecs is skipped in JSON serialization, it will be injected later. + // The map should be keyed using the value from "Fields" to a corresponding Codec that can encode/decode for it + // This allows encoding and decoding implementations to be handled outside of the modifier. + // + // Example: "uint256 Value" -> a chain specific encoder for "uint256 Value" + Codecs map[string]types.RemoteCodec `json:"-"` +} + +func (c *PreCodecModifierConfig) ToModifier(_ ...mapstructure.DecodeHookFunc) (Modifier, error) { + return NewPreCodec(c.Fields, c.Codecs) +} + +func (c *PreCodecModifierConfig) MarshalJSON() ([]byte, error) { + return json.Marshal(&modifierMarshaller[PreCodecModifierConfig]{ + Type: ModifierPreCodec, + T: c, + }) +} + // EpochToTimeModifierConfig is used to convert epoch seconds as uint64 fields on-chain to time.Time type EpochToTimeModifierConfig struct { Fields []string diff --git a/pkg/codec/precodec.go b/pkg/codec/precodec.go new file mode 100644 index 0000000000..447f97dcb7 --- /dev/null +++ b/pkg/codec/precodec.go @@ -0,0 +1,113 @@ +package codec + +import ( + "context" + "fmt" + "reflect" + + "github.com/go-viper/mapstructure/v2" + "github.com/smartcontractkit/chainlink-common/pkg/types" +) + +// PreCodec creates a modifier that will run a preliminary encoding/decoding step. +// This is useful when wanting to move nested data as generic bytes. +func NewPreCodec(fields map[string]string, codecs map[string]types.RemoteCodec) (Modifier, error) { + m := &preCodec{ + modifierBase: modifierBase[string]{ + fields: fields, + onToOffChainType: map[reflect.Type]reflect.Type{}, + offToOnChainType: map[reflect.Type]reflect.Type{}, + }, + codecs: codecs, + } + + // validate that there is a codec for each unique type definition + for _, typeDef := range fields { + if _, ok := m.codecs[typeDef]; ok { + continue + } + return nil, fmt.Errorf("codec not supplied for: %s", typeDef) + } + + m.modifyFieldForInput = func(_ string, field *reflect.StructField, _ string, typeDef string) error { + if field.Type != reflect.SliceOf(reflect.TypeFor[uint8]()) { + return fmt.Errorf("can only decode []byte from on-chain: %s", field.Type) + } + + codec, ok := m.codecs[typeDef] + if !ok || codec == nil { + return fmt.Errorf("codec not found for type definition: '%s'", typeDef) + } + + newType, err := codec.CreateType("", false) + if err != nil { + return err + } + field.Type = reflect.TypeOf(newType) + + return nil + } + + return m, nil +} + +type preCodec struct { + modifierBase[string] + codecs map[string]types.RemoteCodec +} + +func (pc *preCodec) TransformToOffChain(onChainValue any, _ string) (any, error) { + allHooks := make([]mapstructure.DecodeHookFunc, 1) + allHooks[0] = hardCodeManyHook + + return transformWithMaps(onChainValue, pc.onToOffChainType, pc.fields, pc.decodeFieldMapAction, allHooks...) +} + +func (pc *preCodec) decodeFieldMapAction(extractMap map[string]any, key string, typeDef string) error { + _, exists := extractMap[key] + if !exists { + return fmt.Errorf("field %s does not exist", key) + } + + codec, ok := pc.codecs[typeDef] + if !ok || codec == nil { + return fmt.Errorf("codec not found for type definition: '%s'", typeDef) + } + + to, err := codec.CreateType("", false) + if err != nil { + return err + } + err = codec.Decode(context.Background(), extractMap[key].([]byte), &to, "") + if err != nil { + return err + } + extractMap[key] = to + return nil +} + +func (pc *preCodec) TransformToOnChain(offChainValue any, _ string) (any, error) { + allHooks := make([]mapstructure.DecodeHookFunc, 1) + allHooks[0] = hardCodeManyHook + + return transformWithMaps(offChainValue, pc.offToOnChainType, pc.fields, pc.encodeFieldMapAction, allHooks...) +} + +func (pc *preCodec) encodeFieldMapAction(extractMap map[string]any, key string, typeDef string) error { + _, exists := extractMap[key] + if !exists { + return fmt.Errorf("field %s does not exist", key) + } + + codec, ok := pc.codecs[typeDef] + if !ok || codec == nil { + return fmt.Errorf("codec not found for type definition: '%s'", typeDef) + } + + encoded, err := codec.Encode(context.Background(), extractMap[key], "") + if err != nil { + return err + } + extractMap[key] = encoded + return nil +} diff --git a/pkg/codec/precodec_test.go b/pkg/codec/precodec_test.go new file mode 100644 index 0000000000..9911ee8774 --- /dev/null +++ b/pkg/codec/precodec_test.go @@ -0,0 +1,393 @@ +package codec_test + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/codec" + "github.com/smartcontractkit/chainlink-common/pkg/types" +) + +var _ types.RemoteCodec = &ExampleCodec{} + +type ExampleCodec struct { + offChainType any +} + +func (ec ExampleCodec) Encode(_ context.Context, item any, _ string) ([]byte, error) { + return json.Marshal(item) +} + +func (ec ExampleCodec) GetMaxEncodingSize(_ context.Context, n int, _ string) (int, error) { + // not used in the example + return math.MaxInt32, nil +} + +func (ec ExampleCodec) Decode(_ context.Context, raw []byte, into any, _ string) error { + err := json.Unmarshal(raw, into) + if err != nil { + return fmt.Errorf("%w: %w", types.ErrInvalidType, err) + } + return nil +} + +func (ec ExampleCodec) GetMaxDecodingSize(ctx context.Context, n int, _ string) (int, error) { + // not used in the example + return math.MaxInt32, nil +} + +func (ec ExampleCodec) CreateType(_ string, _ bool) (any, error) { + // parameters here are unused in the example, but can be used to determine what type to expect. + // this allows remote execution to know how to decode the incoming message + // and for [codec.NewModifierCodec] to know what type to expect for intermediate phases. + return ec.offChainType, nil +} + +type testStructOff struct { + Ask int + Bid int +} + +type testStructOn struct { + Ask []byte + Bid int +} + +type nestedTestStructOn struct { + Report []byte + FeedID [32]byte + Timestamp int64 +} + +type deepNestedTestStructOn struct { + Reports []nestedTestStructOn +} + +const ( + TestStructOffDef = "uint256 Ask, uint256 Bid" +) + +func TestPreCodec(t *testing.T) { + t.Parallel() + + preCodec, err := codec.NewPreCodec( + map[string]string{"Ask": "uint256"}, + map[string]types.RemoteCodec{"uint256": ExampleCodec{offChainType: int(0)}}, + ) + require.NoError(t, err) + + nestedPreCodec, err := codec.NewPreCodec( + map[string]string{"Report": TestStructOffDef}, + map[string]types.RemoteCodec{TestStructOffDef: ExampleCodec{offChainType: testStructOff{}}}, + ) + require.NoError(t, err) + + deepNestedPreCodec, err := codec.NewPreCodec( + map[string]string{"Reports.Report": TestStructOffDef}, + map[string]types.RemoteCodec{TestStructOffDef: ExampleCodec{offChainType: testStructOff{}}}, + ) + require.NoError(t, err) + + invalidPreCodec, err := codec.NewPreCodec( + map[string]string{"Unknown": TestStructOffDef}, + map[string]types.RemoteCodec{TestStructOffDef: ExampleCodec{offChainType: testStructOff{}}}, + ) + require.NoError(t, err) + + t.Run("NOK codec not supplied", func(t *testing.T) { + _, err := codec.NewPreCodec( + map[string]string{"Unknown": TestStructOffDef}, + map[string]types.RemoteCodec{"invalid def": ExampleCodec{offChainType: testStructOff{}}}, + ) + require.Error(t, err) + }) + + t.Run("RetypeToOffChain converts type to codec.CreateType type", func(t *testing.T) { + offChainType, err := preCodec.RetypeToOffChain(reflect.TypeOf(testStructOn{}), "") + require.NoError(t, err) + require.Equal(t, 2, offChainType.NumField()) + field0 := offChainType.Field(0) + assert.Equal(t, "Ask", field0.Name) + assert.Equal(t, reflect.TypeOf(int(0)), field0.Type) + field1 := offChainType.Field(1) + assert.Equal(t, "Bid", field1.Name) + assert.Equal(t, reflect.TypeOf(int(0)), field1.Type) + }) + + t.Run("RetypeToOffChain works on pointers", func(t *testing.T) { + offChainType, err := preCodec.RetypeToOffChain(reflect.PointerTo(reflect.TypeOf(testStructOn{})), "") + require.NoError(t, err) + assert.Equal(t, reflect.Ptr, offChainType.Kind()) + elem := offChainType.Elem() + require.Equal(t, 2, elem.NumField()) + field0 := elem.Field(0) + assert.Equal(t, "Ask", field0.Name) + assert.Equal(t, reflect.TypeOf(int(0)), field0.Type) + field1 := elem.Field(1) + assert.Equal(t, "Bid", field1.Name) + assert.Equal(t, reflect.TypeOf(int(0)), field1.Type) + }) + + t.Run("RetypeToOffChain works on slices", func(t *testing.T) { + offChainType, err := preCodec.RetypeToOffChain(reflect.SliceOf(reflect.TypeOf(testStructOn{})), "") + require.NoError(t, err) + assert.Equal(t, reflect.Slice, offChainType.Kind()) + elem := offChainType.Elem() + require.Equal(t, 2, elem.NumField()) + field0 := elem.Field(0) + assert.Equal(t, "Ask", field0.Name) + assert.Equal(t, reflect.TypeOf(int(0)), field0.Type) + field1 := elem.Field(1) + assert.Equal(t, "Bid", field1.Name) + assert.Equal(t, reflect.TypeOf(int(0)), field1.Type) + }) + + t.Run("RetypeToOffChain works on arrays", func(t *testing.T) { + offChainType, err := preCodec.RetypeToOffChain(reflect.ArrayOf(1, reflect.TypeOf(testStructOn{})), "") + require.NoError(t, err) + assert.Equal(t, reflect.Array, offChainType.Kind()) + elem := offChainType.Elem() + require.Equal(t, 2, elem.NumField()) + field0 := elem.Field(0) + assert.Equal(t, "Ask", field0.Name) + assert.Equal(t, reflect.TypeOf(int(0)), field0.Type) + field1 := elem.Field(1) + assert.Equal(t, "Bid", field1.Name) + assert.Equal(t, reflect.TypeOf(int(0)), field1.Type) + }) + + t.Run("RetypeToOffChain converts nested type to codec.CreateType type", func(t *testing.T) { + offChainType, err := nestedPreCodec.RetypeToOffChain(reflect.TypeOf(nestedTestStructOn{}), "") + + require.NoError(t, err) + + require.Equal(t, 3, offChainType.NumField()) + field0 := offChainType.Field(0) + assert.Equal(t, "Report", field0.Name) + assert.Equal(t, reflect.TypeOf(testStructOff{}), field0.Type) + field1 := offChainType.Field(1) + assert.Equal(t, "FeedID", field1.Name) + assert.Equal(t, reflect.TypeOf([32]byte{}), field1.Type) + field2 := offChainType.Field(2) + assert.Equal(t, "Timestamp", field2.Name) + assert.Equal(t, reflect.TypeOf(int64(0)), field2.Type) + }) + + t.Run("RetypeToOffChain converts deep nested type to codec.CreateType type", func(t *testing.T) { + offChainType, err := deepNestedPreCodec.RetypeToOffChain(reflect.TypeOf(deepNestedTestStructOn{}), "") + + require.NoError(t, err) + + reports, exists := offChainType.FieldByName("Reports") + assert.True(t, exists) + report := reports.Type.Elem() + require.Equal(t, 3, report.NumField()) + field0 := report.Field(0) + assert.Equal(t, "Report", field0.Name) + assert.Equal(t, reflect.TypeOf(testStructOff{}), field0.Type) + field1 := report.Field(1) + assert.Equal(t, "FeedID", field1.Name) + assert.Equal(t, reflect.TypeOf([32]byte{}), field1.Type) + field2 := report.Field(2) + assert.Equal(t, "Timestamp", field2.Name) + assert.Equal(t, reflect.TypeOf(int64(0)), field2.Type) + }) + + t.Run("RetypeToOffChain only works on byte arrays", func(t *testing.T) { + _, err := preCodec.RetypeToOffChain(reflect.TypeOf(testStructOff{}), "") + require.Error(t, err) + assert.Equal(t, err.Error(), "can only decode []byte from on-chain: int") + }) + + t.Run("RetypeToOffChain only works with a valid path", func(t *testing.T) { + _, err := invalidPreCodec.RetypeToOffChain(reflect.TypeOf(testStructOn{}), "") + require.Error(t, err) + assert.Equal(t, err.Error(), "invalid type: cannot find Unknown") + }) + + t.Run("TransformToOnChain and TransformToOffChain returns error if input type was not from TransformToOnChain", func(t *testing.T) { + incorrectVal := struct{}{} + _, err := preCodec.TransformToOnChain(incorrectVal, "") + assert.True(t, errors.Is(err, types.ErrInvalidType)) + _, err = preCodec.TransformToOffChain(incorrectVal, "") + assert.True(t, errors.Is(err, types.ErrInvalidType)) + }) + + t.Run("TransformToOnChain and TransformToOffChain works on structs", func(t *testing.T) { + offChainType, err := preCodec.RetypeToOffChain(reflect.TypeOf(testStructOn{}), "") + require.NoError(t, err) + iOffchain := reflect.Indirect(reflect.New(offChainType)) + iOffchain.FieldByName("Ask").SetInt(20) + iOffchain.FieldByName("Bid").SetInt(10) + + output, err := preCodec.TransformToOnChain(iOffchain.Interface(), "") + require.NoError(t, err) + + jsonEncoded, err := json.Marshal(20) + require.NoError(t, err) + expected := testStructOn{ + Ask: jsonEncoded, + Bid: 10, + } + assert.Equal(t, expected, output) + newInput, err := preCodec.TransformToOffChain(expected, "") + require.NoError(t, err) + assert.Equal(t, iOffchain.Interface(), newInput) + }) + + t.Run("TransformToOnChain and TransformToOffChain works on pointers", func(t *testing.T) { + offChainType, err := preCodec.RetypeToOffChain(reflect.PointerTo(reflect.TypeOf(testStructOn{})), "") + require.NoError(t, err) + + rOffchain := reflect.New(offChainType.Elem()) + iOffchain := reflect.Indirect(rOffchain) + iOffchain.FieldByName("Ask").SetInt(20) + iOffchain.FieldByName("Bid").SetInt(10) + + output, err := preCodec.TransformToOnChain(rOffchain.Interface(), "") + require.NoError(t, err) + jsonEncoded, err := json.Marshal(20) + require.NoError(t, err) + expected := testStructOn{ + Ask: jsonEncoded, + Bid: 10, + } + assert.Equal(t, &expected, output) + newInput, err := preCodec.TransformToOffChain(expected, "") + require.NoError(t, err) + assert.Equal(t, iOffchain.Interface(), newInput) + }) + + t.Run("TransformToOnChain and TransformToOffChain works on slices", func(t *testing.T) { + offChainType, err := preCodec.RetypeToOffChain(reflect.SliceOf(reflect.TypeOf(testStructOn{})), "") + require.NoError(t, err) + + iOffchain := reflect.MakeSlice(offChainType, 2, 2) + iElm := iOffchain.Index(0) + iElm.FieldByName("Ask").SetInt(20) + iElm.FieldByName("Bid").SetInt(10) + iElm2 := iOffchain.Index(1) + iElm2.FieldByName("Ask").SetInt(20) + iElm2.FieldByName("Bid").SetInt(30) + + output, err := preCodec.TransformToOnChain(iOffchain.Interface(), "") + require.NoError(t, err) + + jsonEncoded, err := json.Marshal(20) + require.NoError(t, err) + expected := []testStructOn{ + { + Ask: jsonEncoded, + Bid: 10, + }, + { + Ask: jsonEncoded, + Bid: 30, + }, + } + assert.Equal(t, expected, output) + newInput, err := preCodec.TransformToOffChain(expected, "") + require.NoError(t, err) + assert.Equal(t, iOffchain.Interface(), newInput) + }) + + t.Run("TransformToOnChain and TransformToOffChain works on arrays", func(t *testing.T) { + offChainType, err := preCodec.RetypeToOffChain(reflect.ArrayOf(2, reflect.TypeOf(testStructOn{})), "") + require.NoError(t, err) + + iOffchain := reflect.New(offChainType).Elem() + iElm := iOffchain.Index(0) + iElm.FieldByName("Ask").SetInt(20) + iElm.FieldByName("Bid").SetInt(10) + iElm2 := iOffchain.Index(1) + iElm2.FieldByName("Ask").SetInt(20) + iElm2.FieldByName("Bid").SetInt(30) + + output, err := preCodec.TransformToOnChain(iOffchain.Interface(), "") + require.NoError(t, err) + + jsonEncoded, err := json.Marshal(20) + require.NoError(t, err) + expected := [2]testStructOn{ + { + Ask: jsonEncoded, + Bid: 10, + }, + { + Ask: jsonEncoded, + Bid: 30, + }, + } + assert.Equal(t, expected, output) + newInput, err := preCodec.TransformToOffChain(expected, "") + require.NoError(t, err) + assert.Equal(t, iOffchain.Interface(), newInput) + }) + + t.Run("TransformToOnChain and TransformToOffChain works on nested fields", func(t *testing.T) { + offChainType, err := nestedPreCodec.RetypeToOffChain(reflect.TypeOf(nestedTestStructOn{}), "") + require.NoError(t, err) + + iOffchain := reflect.Indirect(reflect.New(offChainType)) + iReport := iOffchain.FieldByName("Report") + iReport.FieldByName("Ask").SetInt(20) + iReport.FieldByName("Bid").SetInt(10) + + output, err := nestedPreCodec.TransformToOnChain(iOffchain.Interface(), "") + require.NoError(t, err) + + report := testStructOff{ + Ask: 20, + Bid: 10, + } + jsonEncoded, err := json.Marshal(report) + require.NoError(t, err) + expected := nestedTestStructOn{ + Report: jsonEncoded, + } + assert.Equal(t, expected, output) + newInput, err := nestedPreCodec.TransformToOffChain(expected, "") + require.NoError(t, err) + assert.Equal(t, iOffchain.Interface(), newInput) + }) + + t.Run("TransformToOnChain and TransformToOffChain works on deeply nested fields", func(t *testing.T) { + offChainType, err := deepNestedPreCodec.RetypeToOffChain(reflect.TypeOf(deepNestedTestStructOn{}), "") + require.NoError(t, err) + + iOffchain := reflect.Indirect(reflect.New(offChainType)) + iReports := iOffchain.FieldByName("Reports") + iReports.Set(reflect.MakeSlice(iReports.Type(), 1, 1)) + iElm := iReports.Index(0) + iReport := iElm.FieldByName("Report") + iReport.FieldByName("Ask").SetInt(20) + iReport.FieldByName("Bid").SetInt(10) + + output, err := deepNestedPreCodec.TransformToOnChain(iOffchain.Interface(), "") + require.NoError(t, err) + + report := testStructOff{ + Ask: 20, + Bid: 10, + } + jsonEncoded, err := json.Marshal(report) + require.NoError(t, err) + expected := deepNestedTestStructOn{ + Reports: []nestedTestStructOn{ + {Report: jsonEncoded}, + }, + } + assert.Equal(t, expected, output) + newInput, err := deepNestedPreCodec.TransformToOffChain(expected, "") + require.NoError(t, err) + assert.Equal(t, iOffchain.Interface(), newInput) + }) +}