Skip to content

Commit

Permalink
pkg/loop: plugins report health to host
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Jan 8, 2025
1 parent 564f79f commit 913872e
Show file tree
Hide file tree
Showing 65 changed files with 908 additions and 547 deletions.
7 changes: 7 additions & 0 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ func With(l Logger, keyvals ...interface{}) Logger {

// Named returns a logger with name 'n', if 'l' has a method `Named(string) L`, where L implements Logger, otherwise it returns l.
func Named(l Logger, n string) Logger {
l = named(l, n)
if testing.Testing() {
l.Debugf("New logger: %s", n)
}
return l
}
func named(l Logger, n string) Logger {
switch t := l.(type) {
case *logger:
return t.named(n)
Expand Down
11 changes: 8 additions & 3 deletions pkg/loop/ccip_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/goplugin"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/reportingplugin/ccip"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

Expand Down Expand Up @@ -69,12 +70,16 @@ type CommitFactoryService struct {
// NewCommitService returns a new [*CommitFactoryService].
// cmd must return a new exec.Cmd each time it is called.
func NewCommitService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.CCIPCommitProvider) *CommitFactoryService {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, services.HealthReporter, error) {
plug, ok := instance.(types.CCIPCommitFactoryGenerator)
if !ok {
return nil, fmt.Errorf("expected CCIPCommitFactoryGenerator but got %T", instance)
return nil, nil, fmt.Errorf("expected CCIPCommitFactoryGenerator but got %T", instance)
}
return plug.NewCommitFactory(ctx, provider)
factory, err := plug.NewCommitFactory(ctx, provider)
if err != nil {
return nil, nil, err
}
return factory, plug, nil
}
stopCh := make(chan struct{})
lggr = logger.Named(lggr, "CCIPCommitService")
Expand Down
10 changes: 6 additions & 4 deletions pkg/loop/ccip_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
func TestCommitService(t *testing.T) {
t.Parallel()

commit := loop.NewCommitService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
lggr := logger.Test(t)
commit := loop.NewCommitService(lggr, loop.GRPCOpts{}, func() *exec.Cmd {
return NewHelperProcessCommand(loop.CCIPCommitLOOPName, false, 0)
}, cciptest.CommitProvider)
}, cciptest.CommitProvider(lggr))

t.Run("service not nil", func(t *testing.T) {
require.NotPanics(t, func() { commit.Name() })
Expand Down Expand Up @@ -62,14 +63,15 @@ func TestCommitService(t *testing.T) {

func TestCommitService_recovery(t *testing.T) {
t.Parallel()
lggr := logger.Test(t)
var limit atomic.Int32
commit := loop.NewCommitService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
commit := loop.NewCommitService(lggr, loop.GRPCOpts{}, func() *exec.Cmd {
h := HelperProcessCommand{
Command: loop.CCIPCommitLOOPName,
Limit: int(limit.Add(1)),
}
return h.New()
}, cciptest.CommitProvider)
}, cciptest.CommitProvider(lggr))
servicetest.Run(t, commit)

reportingplugintest.RunFactory(t, commit)
Expand Down
11 changes: 8 additions & 3 deletions pkg/loop/ccip_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/goplugin"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/reportingplugin/ccip"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

Expand Down Expand Up @@ -69,12 +70,16 @@ type ExecutionFactoryService struct {
// NewExecutionService returns a new [*ExecutionFactoryService].
// cmd must return a new exec.Cmd each time it is called.
func NewExecutionService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChain uint32, dstChain uint32, sourceTokenAddress string) *ExecutionFactoryService {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, services.HealthReporter, error) {
plug, ok := instance.(types.CCIPExecutionFactoryGenerator)
if !ok {
return nil, fmt.Errorf("expected CCIPExecutionFactoryGenerator but got %T", instance)
return nil, nil, fmt.Errorf("expected CCIPExecutionFactoryGenerator but got %T", instance)
}
return plug.NewExecutionFactory(ctx, srcProvider, dstProvider, int64(srcChain), int64(dstChain), sourceTokenAddress)
factory, err := plug.NewExecutionFactory(ctx, srcProvider, dstProvider, int64(srcChain), int64(dstChain), sourceTokenAddress)
if err != nil {
return nil, nil, err
}
return factory, plug, nil
}
stopCh := make(chan struct{})
lggr = logger.Named(lggr, "CCIPExecutionService")
Expand Down
8 changes: 5 additions & 3 deletions pkg/loop/ccip_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
func TestExecService(t *testing.T) {
t.Parallel()

lggr := logger.Test(t)
exec := loop.NewExecutionService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
return NewHelperProcessCommand(loop.CCIPExecutionLOOPName, false, 0)
}, cciptest.ExecutionProvider, cciptest.ExecutionProvider, 0, 0, "")
}, cciptest.ExecutionProvider(lggr), cciptest.ExecutionProvider(lggr), 0, 0, "")
hook := exec.PluginService.XXXTestHook()
servicetest.Run(t, exec)

Expand Down Expand Up @@ -57,14 +58,15 @@ func TestExecService(t *testing.T) {

func TestExecService_recovery(t *testing.T) {
t.Parallel()
lggr := logger.Test(t)
var limit atomic.Int32
exec := loop.NewExecutionService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
exec := loop.NewExecutionService(lggr, loop.GRPCOpts{}, func() *exec.Cmd {
h := HelperProcessCommand{
Command: loop.CCIPExecutionLOOPName,
Limit: int(limit.Add(1)),
}
return h.New()
}, cciptest.ExecutionProvider, cciptest.ExecutionProvider, 0, 0, "")
}, cciptest.ExecutionProvider(lggr), cciptest.ExecutionProvider(lggr), 0, 0, "")
servicetest.Run(t, exec)

reportingplugintest.RunFactory(t, exec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type reportingPluginServiceServer struct {
}

func RegisterReportingPluginServiceServer(server *grpc.Server, broker net.Broker, brokerCfg net.BrokerConfig, impl core.ReportingPluginClient) error {
pb.RegisterServiceServer(server, &goplugin.ServiceServer{Srv: impl})
pb.RegisterReportingPluginServiceServer(server, newReportingPluginServiceServer(&net.BrokerExt{Broker: broker, BrokerConfig: brokerCfg}, impl))
return nil
}
Expand Down Expand Up @@ -218,6 +219,11 @@ func (m *reportingPluginServiceServer) NewReportingPluginFactory(ctx context.Con
m.CloseAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes, keyValueStoreRes, relayerSetRes)
return nil, err
}
//TODO start factory - when to close?
if err = factory.Start(ctx); err != nil {
m.CloseAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes, keyValueStoreRes, relayerSetRes)
return nil, err
}

id, _, err := m.ServeNew("ReportingPluginProvider", func(s *grpc.Server) {
pb.RegisterServiceServer(s, &goplugin.ServiceServer{Srv: factory})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,29 @@ import (

"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
pipelinetest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/pipeline/test"
telemetrytest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/telemetry/test"
validationtest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/validation/test"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net"
mediantest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/relayer/pluginprovider/ext/median/test"
ocr2test "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/relayer/pluginprovider/ocr2/test"
reportingplugintest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/reportingplugin/test"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/test"
testtypes "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/test/types"
"github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
)

var MedianProviderServer = medianFactoryServer{
medianGeneratorConfig: medianGeneratorConfig{
medianProvider: mediantest.MedianProvider,
func MedianProviderServer(lggr logger.Logger) medianFactoryServer {
return newMedianFactoryServer(lggr, medianGeneratorConfig{
medianProvider: mediantest.MedianProvider(lggr),
pipeline: pipelinetest.PipelineRunner,
telemetry: telemetrytest.Telemetry,
validationService: validationtest.ValidationService,
},
})
}

const MedianID = "ocr2-reporting-plugin-with-median-provider"
Expand All @@ -38,7 +41,18 @@ type medianGeneratorConfig struct {
}

type medianFactoryServer struct {
services.Service
medianGeneratorConfig
factory types.ReportingPluginFactory
}

func newMedianFactoryServer(lggr logger.Logger, cfg medianGeneratorConfig) medianFactoryServer {
lggr = logger.Named(lggr, "medianFactoryServer")
return medianFactoryServer{
Service: test.NewStaticService(lggr),
medianGeneratorConfig: cfg,
factory: reportingplugintest.Factory(lggr),
}
}

var _ reportingplugins.ProviderServer[types.MedianProvider] = medianFactoryServer{}
Expand Down Expand Up @@ -69,23 +83,30 @@ func (s medianFactoryServer) NewReportingPluginFactory(ctx context.Context, conf
return nil, fmt.Errorf("failed to evaluate telemetry: %w", err)
}

return reportingplugintest.Factory, nil
return s.factory, nil
}

var AgnosticProviderServer = agnosticPluginFactoryServer{
provider: ocr2test.AgnosticPluginProvider,
pipelineRunner: pipelinetest.PipelineRunner,
telemetry: telemetrytest.Telemetry,
validationService: validationtest.ValidationService,
func AgnosticProviderServer(lggr logger.Logger) agnosticPluginFactoryServer {
lggr = logger.Named(lggr, "agnosticPluginFactoryServer")
return agnosticPluginFactoryServer{
Service: test.NewStaticService(lggr),
provider: ocr2test.AgnosticPluginProvider(lggr),
pipelineRunner: pipelinetest.PipelineRunner,
telemetry: telemetrytest.Telemetry,
validationService: validationtest.ValidationService,
factory: reportingplugintest.Factory(lggr),
}
}

var _ reportingplugins.ProviderServer[types.PluginProvider] = agnosticPluginFactoryServer{}

type agnosticPluginFactoryServer struct {
services.Service
provider testtypes.PluginProviderTester
pipelineRunner testtypes.PipelineEvaluator
telemetry testtypes.TelemetryEvaluator
validationService testtypes.ValidationEvaluator
factory types.ReportingPluginFactory
}

func (s agnosticPluginFactoryServer) NewValidationService(ctx context.Context) (core.ValidationService, error) {
Expand Down Expand Up @@ -114,5 +135,5 @@ func (s agnosticPluginFactoryServer) NewReportingPluginFactory(ctx context.Conte
return nil, fmt.Errorf("failed to evaluate telemetry: %w", err)
}

return reportingplugintest.Factory, nil
return s.factory, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ func (m reportingPluginServiceServer) NewReportingPluginFactory(ctx context.Cont
m.CloseAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes, capRegistryRes, relayerSetRes)
return nil, err
}
//TODO start factory - when to close?
if err = factory.Start(ctx); err != nil {
m.CloseAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes, keyValueStoreRes, relayerSetRes)
return nil, err
}

id, _, err := m.ServeNew("ReportingPluginProvider", func(s *grpc.Server) {
pb.RegisterServiceServer(s, &goplugin.ServiceServer{Srv: factory})
Expand All @@ -251,6 +256,7 @@ func (m reportingPluginServiceServer) NewReportingPluginFactory(ctx context.Cont
}

func RegisterReportingPluginServiceServer(server *grpc.Server, broker net.Broker, brokerCfg net.BrokerConfig, impl core.OCR3ReportingPluginClient) error {
pb.RegisterServiceServer(server, &goplugin.ServiceServer{Srv: impl})
pb.RegisterReportingPluginServiceServer(server, newReportingPluginServiceServer(&net.BrokerExt{Broker: broker, BrokerConfig: brokerCfg}, impl))
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,43 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/test"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

var Factory = ocr3staticPluginFactory{
ReportingPluginConfig: ocr3reportingPluginConfig,
reportingPlugin: ReportingPlugin,
func Factory(lggr logger.Logger) ocr3StaticPluginFactory {
return newOCR3StatisPluginFactory(lggr, ocr3reportingPluginConfig, ReportingPlugin)
}

// OCR3
type ocr3staticPluginFactory struct {
type ocr3StaticPluginFactory struct {
services.Service
ocr3types.ReportingPluginConfig
reportingPlugin ocr3staticReportingPlugin
}

var _ core.OCR3ReportingPluginFactory = (*ocr3staticPluginFactory)(nil)
var _ core.OCR3ReportingPluginFactory = (*ocr3StaticPluginFactory)(nil)

func (o ocr3staticPluginFactory) Name() string { panic("implement me") }

func (o ocr3staticPluginFactory) Start(ctx context.Context) error { return nil }

func (o ocr3staticPluginFactory) Close() error { return nil }

func (o ocr3staticPluginFactory) Ready() error { panic("implement me") }

func (o ocr3staticPluginFactory) HealthReport() map[string]error { panic("implement me") }
func newOCR3StatisPluginFactory(lggr logger.Logger, cfg ocr3types.ReportingPluginConfig, rp ocr3staticReportingPlugin) ocr3StaticPluginFactory {
return ocr3StaticPluginFactory{
Service: test.NewStaticService(lggr),
ReportingPluginConfig: cfg,
reportingPlugin: rp,
}
}

func (o ocr3staticPluginFactory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
func (o ocr3StaticPluginFactory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
err := o.equalConfig(config)
if err != nil {
return nil, ocr3types.ReportingPluginInfo{}, fmt.Errorf("config mismatch: %w", err)
}
return o.reportingPlugin, ocr3rpi, nil
}

func (o ocr3staticPluginFactory) equalConfig(other ocr3types.ReportingPluginConfig) error {
func (o ocr3StaticPluginFactory) equalConfig(other ocr3types.ReportingPluginConfig) error {
if other.ConfigDigest != o.ConfigDigest {
return fmt.Errorf("expected ConfigDigest %x but got %x", o.ConfigDigest, other.ConfigDigest)
}
Expand Down Expand Up @@ -86,7 +87,7 @@ func (o ocr3staticPluginFactory) equalConfig(other ocr3types.ReportingPluginConf
}

func OCR3ReportingPluginFactory(t *testing.T, factory core.OCR3ReportingPluginFactory) {
expectedFactory := Factory
expectedFactory := Factory(logger.Test(t))
t.Run("OCR3ReportingPluginFactory", func(t *testing.T) {
ctx := tests.Context(t)
rp, gotRPI, err := factory.NewReportingPlugin(ctx, ocr3reportingPluginConfig)
Expand Down
Loading

0 comments on commit 913872e

Please sign in to comment.