Skip to content

Commit

Permalink
pkg/loop: plugins report health to host
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Jan 8, 2025
1 parent 564f79f commit cfd95c0
Show file tree
Hide file tree
Showing 80 changed files with 1,010 additions and 628 deletions.
7 changes: 7 additions & 0 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ func With(l Logger, keyvals ...interface{}) Logger {

// Named returns a logger with name 'n', if 'l' has a method `Named(string) L`, where L implements Logger, otherwise it returns l.
func Named(l Logger, n string) Logger {
l = named(l, n)
if testing.Testing() {
l.Debugf("New logger: %s", n)
}
return l
}
func named(l Logger, n string) Logger {
switch t := l.(type) {
case *logger:
return t.named(n)
Expand Down
21 changes: 14 additions & 7 deletions pkg/loop/ccip_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/goplugin"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/reportingplugin/ccip"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

Expand Down Expand Up @@ -43,10 +44,9 @@ func (p *CommitLoop) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server)
// GRPCClient implements [plugin.GRPCPlugin] and returns the pluginClient [types.CCIPCommitFactoryGenerator], updated with the new broker and conn.
func (p *CommitLoop) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error) {
if p.pluginClient == nil {
p.pluginClient = ccip.NewCommitLOOPClient(broker, p.BrokerConfig, conn)
} else {
p.pluginClient.Refresh(broker, conn)
p.pluginClient = ccip.NewCommitLOOPClient(p.BrokerConfig)
}
p.pluginClient.Refresh(broker, conn)

return types.CCIPCommitFactoryGenerator(p.pluginClient), nil
}
Expand All @@ -56,7 +56,10 @@ func (p *CommitLoop) ClientConfig() *plugin.ClientConfig {
HandshakeConfig: PluginCCIPCommitHandshakeConfig(),
Plugins: map[string]plugin.Plugin{CCIPCommitLOOPName: p},
}
return ManagedGRPCClientConfig(clientConfig, p.BrokerConfig)
if p.pluginClient == nil {
p.pluginClient = ccip.NewCommitLOOPClient(p.BrokerConfig)
}
return ManagedGRPCClientConfig(clientConfig, p.pluginClient.BrokerConfig)
}

var _ ocrtypes.ReportingPluginFactory = (*CommitFactoryService)(nil)
Expand All @@ -69,12 +72,16 @@ type CommitFactoryService struct {
// NewCommitService returns a new [*CommitFactoryService].
// cmd must return a new exec.Cmd each time it is called.
func NewCommitService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.CCIPCommitProvider) *CommitFactoryService {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, services.HealthReporter, error) {
plug, ok := instance.(types.CCIPCommitFactoryGenerator)
if !ok {
return nil, fmt.Errorf("expected CCIPCommitFactoryGenerator but got %T", instance)
return nil, nil, fmt.Errorf("expected CCIPCommitFactoryGenerator but got %T", instance)
}
factory, err := plug.NewCommitFactory(ctx, provider)
if err != nil {
return nil, nil, err
}
return plug.NewCommitFactory(ctx, provider)
return factory, plug, nil
}
stopCh := make(chan struct{})
lggr = logger.Named(lggr, "CCIPCommitService")
Expand Down
10 changes: 6 additions & 4 deletions pkg/loop/ccip_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
func TestCommitService(t *testing.T) {
t.Parallel()

commit := loop.NewCommitService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
lggr := logger.Test(t)
commit := loop.NewCommitService(lggr, loop.GRPCOpts{}, func() *exec.Cmd {
return NewHelperProcessCommand(loop.CCIPCommitLOOPName, false, 0)
}, cciptest.CommitProvider)
}, cciptest.CommitProvider(lggr))

t.Run("service not nil", func(t *testing.T) {
require.NotPanics(t, func() { commit.Name() })
Expand Down Expand Up @@ -62,14 +63,15 @@ func TestCommitService(t *testing.T) {

func TestCommitService_recovery(t *testing.T) {
t.Parallel()
lggr := logger.Test(t)
var limit atomic.Int32
commit := loop.NewCommitService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
commit := loop.NewCommitService(lggr, loop.GRPCOpts{}, func() *exec.Cmd {
h := HelperProcessCommand{
Command: loop.CCIPCommitLOOPName,
Limit: int(limit.Add(1)),
}
return h.New()
}, cciptest.CommitProvider)
}, cciptest.CommitProvider(lggr))
servicetest.Run(t, commit)

reportingplugintest.RunFactory(t, commit)
Expand Down
21 changes: 14 additions & 7 deletions pkg/loop/ccip_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/goplugin"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/reportingplugin/ccip"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

Expand Down Expand Up @@ -43,10 +44,9 @@ func (p *ExecutionLoop) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Serve
// GRPCClient implements [plugin.GRPCPlugin] and returns the pluginClient [types.CCIPExecutionFactoryGenerator], updated with the new broker and conn.
func (p *ExecutionLoop) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error) {
if p.pluginClient == nil {
p.pluginClient = ccip.NewExecutionLOOPClient(broker, p.BrokerConfig, conn)
} else {
p.pluginClient.Refresh(broker, conn)
p.pluginClient = ccip.NewExecutionLOOPClient(p.BrokerConfig)
}
p.pluginClient.Refresh(broker, conn)

return types.CCIPExecutionFactoryGenerator(p.pluginClient), nil
}
Expand All @@ -56,7 +56,10 @@ func (p *ExecutionLoop) ClientConfig() *plugin.ClientConfig {
HandshakeConfig: PluginCCIPExecutionHandshakeConfig(),
Plugins: map[string]plugin.Plugin{CCIPExecutionLOOPName: p},
}
return ManagedGRPCClientConfig(clientConfig, p.BrokerConfig)
if p.pluginClient == nil {
p.pluginClient = ccip.NewExecutionLOOPClient(p.BrokerConfig)
}
return ManagedGRPCClientConfig(clientConfig, p.pluginClient.BrokerConfig)
}

var _ ocrtypes.ReportingPluginFactory = (*ExecutionFactoryService)(nil)
Expand All @@ -69,12 +72,16 @@ type ExecutionFactoryService struct {
// NewExecutionService returns a new [*ExecutionFactoryService].
// cmd must return a new exec.Cmd each time it is called.
func NewExecutionService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChain uint32, dstChain uint32, sourceTokenAddress string) *ExecutionFactoryService {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, services.HealthReporter, error) {
plug, ok := instance.(types.CCIPExecutionFactoryGenerator)
if !ok {
return nil, fmt.Errorf("expected CCIPExecutionFactoryGenerator but got %T", instance)
return nil, nil, fmt.Errorf("expected CCIPExecutionFactoryGenerator but got %T", instance)
}
factory, err := plug.NewExecutionFactory(ctx, srcProvider, dstProvider, int64(srcChain), int64(dstChain), sourceTokenAddress)
if err != nil {
return nil, nil, err
}
return plug.NewExecutionFactory(ctx, srcProvider, dstProvider, int64(srcChain), int64(dstChain), sourceTokenAddress)
return factory, plug, nil
}
stopCh := make(chan struct{})
lggr = logger.Named(lggr, "CCIPExecutionService")
Expand Down
8 changes: 5 additions & 3 deletions pkg/loop/ccip_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
func TestExecService(t *testing.T) {
t.Parallel()

lggr := logger.Test(t)
exec := loop.NewExecutionService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
return NewHelperProcessCommand(loop.CCIPExecutionLOOPName, false, 0)
}, cciptest.ExecutionProvider, cciptest.ExecutionProvider, 0, 0, "")
}, cciptest.ExecutionProvider(lggr), cciptest.ExecutionProvider(lggr), 0, 0, "")
hook := exec.PluginService.XXXTestHook()
servicetest.Run(t, exec)

Expand Down Expand Up @@ -57,14 +58,15 @@ func TestExecService(t *testing.T) {

func TestExecService_recovery(t *testing.T) {
t.Parallel()
lggr := logger.Test(t)
var limit atomic.Int32
exec := loop.NewExecutionService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
exec := loop.NewExecutionService(lggr, loop.GRPCOpts{}, func() *exec.Cmd {
h := HelperProcessCommand{
Command: loop.CCIPExecutionLOOPName,
Limit: int(limit.Add(1)),
}
return h.New()
}, cciptest.ExecutionProvider, cciptest.ExecutionProvider, 0, 0, "")
}, cciptest.ExecutionProvider(lggr), cciptest.ExecutionProvider(lggr), 0, 0, "")
servicetest.Run(t, exec)

reportingplugintest.RunFactory(t, exec)
Expand Down
1 change: 1 addition & 0 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (e *EnvConfig) parse() error {
}

// ManagedGRPCClientConfig return a Managed plugin and set grpc config values from the BrokerConfig.
// The innermost relevant BrokerConfig should be used, to include any relevant services in the logger name.
// Note: managed plugins shutdown when the parent process exits. We may want to change this behavior in the future
// to enable host process restarts without restarting the plugin. To do that we would also need
// supply the appropriate ReattachConfig to the plugin.ClientConfig.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/capability"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/errorlog"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/keyvalue"
Expand Down Expand Up @@ -44,12 +45,14 @@ type StandardCapabilitiesClient struct {

var _ StandardCapabilities = (*StandardCapabilitiesClient)(nil)

func NewStandardCapabilitiesClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *StandardCapabilitiesClient {
func NewStandardCapabilitiesClient(brokerCfg net.BrokerConfig) *StandardCapabilitiesClient {
brokerCfg.Logger = logger.Named(brokerCfg.Logger, "StandardCapabilitiesClient")
pc := goplugin.NewPluginClient(brokerCfg)
return &StandardCapabilitiesClient{
PluginClient: goplugin.NewPluginClient(brokerExt.Broker, brokerExt.BrokerConfig, conn),
ServiceClient: goplugin.NewServiceClient(brokerExt, conn),
StandardCapabilitiesClient: capabilitiespb.NewStandardCapabilitiesClient(conn),
BrokerExt: brokerExt,
PluginClient: pc,
ServiceClient: goplugin.NewServiceClient(pc.BrokerExt, pc),
StandardCapabilitiesClient: capabilitiespb.NewStandardCapabilitiesClient(pc),
BrokerExt: pc.BrokerExt,
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/loop/internal/core/services/oraclefactory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ type client struct {
serviceClient *goplugin.ServiceClient
}

func NewClient(log logger.Logger, broker *net.BrokerExt, conn grpc.ClientConnInterface) *client {
namedBroker := broker.WithName("OracleFactoryClient")
func NewClient(log logger.Logger, b *net.BrokerExt, conn grpc.ClientConnInterface) *client {
b = b.WithName("OracleFactoryClient")
return &client{
log: log,
broker: namedBroker,
serviceClient: goplugin.NewServiceClient(namedBroker, conn),
broker: b,
serviceClient: goplugin.NewServiceClient(b, conn),
grpc: oraclefactorypb.NewOracleFactoryClient(conn)}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ type ReportingPluginFactoryClient struct {
}

func NewReportingPluginFactoryClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *ReportingPluginFactoryClient {
b = b.WithName("ReportingPluginProviderClient")
return &ReportingPluginFactoryClient{
BrokerExt: b.WithName("ReportingPluginProviderClient"),
BrokerExt: b,
ServiceClient: goplugin.NewServiceClient(b, cc),
grpc: pb.NewReportingPluginFactoryClient(cc),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type ReportingPluginServiceClient struct {
reportingPluginService pb.ReportingPluginServiceClient
}

func NewReportingPluginServiceClient(broker net.Broker, brokerCfg net.BrokerConfig, conn *grpc.ClientConn) *ReportingPluginServiceClient {
func NewReportingPluginServiceClient(brokerCfg net.BrokerConfig) *ReportingPluginServiceClient {
brokerCfg.Logger = logger.Named(brokerCfg.Logger, "ReportingPluginServiceClient")
pc := goplugin.NewPluginClient(broker, brokerCfg, conn)
pc := goplugin.NewPluginClient(brokerCfg)
return &ReportingPluginServiceClient{PluginClient: pc, reportingPluginService: pb.NewReportingPluginServiceClient(pc), ServiceClient: goplugin.NewServiceClient(pc.BrokerExt, pc)}
}

Expand Down Expand Up @@ -149,6 +149,7 @@ type reportingPluginServiceServer struct {
}

func RegisterReportingPluginServiceServer(server *grpc.Server, broker net.Broker, brokerCfg net.BrokerConfig, impl core.ReportingPluginClient) error {
pb.RegisterServiceServer(server, &goplugin.ServiceServer{Srv: impl})
pb.RegisterReportingPluginServiceServer(server, newReportingPluginServiceServer(&net.BrokerExt{Broker: broker, BrokerConfig: brokerCfg}, impl))
return nil
}
Expand Down Expand Up @@ -218,6 +219,11 @@ func (m *reportingPluginServiceServer) NewReportingPluginFactory(ctx context.Con
m.CloseAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes, keyValueStoreRes, relayerSetRes)
return nil, err
}
//TODO start factory - when to close?
if err = factory.Start(ctx); err != nil {
m.CloseAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes, keyValueStoreRes, relayerSetRes)
return nil, err
}

id, _, err := m.ServeNew("ReportingPluginProvider", func(s *grpc.Server) {
pb.RegisterServiceServer(s, &goplugin.ServiceServer{Srv: factory})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,29 @@ import (

"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
pipelinetest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/pipeline/test"
telemetrytest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/telemetry/test"
validationtest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/validation/test"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net"
mediantest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/relayer/pluginprovider/ext/median/test"
ocr2test "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/relayer/pluginprovider/ocr2/test"
reportingplugintest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/reportingplugin/test"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/test"
testtypes "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/test/types"
"github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
)

var MedianProviderServer = medianFactoryServer{
medianGeneratorConfig: medianGeneratorConfig{
medianProvider: mediantest.MedianProvider,
func MedianProviderServer(lggr logger.Logger) medianFactoryServer {
return newMedianFactoryServer(lggr, medianGeneratorConfig{
medianProvider: mediantest.MedianProvider(lggr),
pipeline: pipelinetest.PipelineRunner,
telemetry: telemetrytest.Telemetry,
validationService: validationtest.ValidationService,
},
})
}

const MedianID = "ocr2-reporting-plugin-with-median-provider"
Expand All @@ -38,7 +41,18 @@ type medianGeneratorConfig struct {
}

type medianFactoryServer struct {
services.Service
medianGeneratorConfig
factory types.ReportingPluginFactory
}

func newMedianFactoryServer(lggr logger.Logger, cfg medianGeneratorConfig) medianFactoryServer {
lggr = logger.Named(lggr, "medianFactoryServer")
return medianFactoryServer{
Service: test.NewStaticService(lggr),
medianGeneratorConfig: cfg,
factory: reportingplugintest.Factory(lggr),
}
}

var _ reportingplugins.ProviderServer[types.MedianProvider] = medianFactoryServer{}
Expand Down Expand Up @@ -69,23 +83,30 @@ func (s medianFactoryServer) NewReportingPluginFactory(ctx context.Context, conf
return nil, fmt.Errorf("failed to evaluate telemetry: %w", err)
}

return reportingplugintest.Factory, nil
return s.factory, nil
}

var AgnosticProviderServer = agnosticPluginFactoryServer{
provider: ocr2test.AgnosticPluginProvider,
pipelineRunner: pipelinetest.PipelineRunner,
telemetry: telemetrytest.Telemetry,
validationService: validationtest.ValidationService,
func AgnosticProviderServer(lggr logger.Logger) agnosticPluginFactoryServer {
lggr = logger.Named(lggr, "agnosticPluginFactoryServer")
return agnosticPluginFactoryServer{
Service: test.NewStaticService(lggr),
provider: ocr2test.AgnosticPluginProvider(lggr),
pipelineRunner: pipelinetest.PipelineRunner,
telemetry: telemetrytest.Telemetry,
validationService: validationtest.ValidationService,
factory: reportingplugintest.Factory(lggr),
}
}

var _ reportingplugins.ProviderServer[types.PluginProvider] = agnosticPluginFactoryServer{}

type agnosticPluginFactoryServer struct {
services.Service
provider testtypes.PluginProviderTester
pipelineRunner testtypes.PipelineEvaluator
telemetry testtypes.TelemetryEvaluator
validationService testtypes.ValidationEvaluator
factory types.ReportingPluginFactory
}

func (s agnosticPluginFactoryServer) NewValidationService(ctx context.Context) (core.ValidationService, error) {
Expand Down Expand Up @@ -114,5 +135,5 @@ func (s agnosticPluginFactoryServer) NewReportingPluginFactory(ctx context.Conte
return nil, fmt.Errorf("failed to evaluate telemetry: %w", err)
}

return reportingplugintest.Factory, nil
return s.factory, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type reportingPluginFactoryClient struct {
}

func NewReportingPluginFactoryClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *reportingPluginFactoryClient {
return &reportingPluginFactoryClient{b.WithName("OCR3ReportingPluginProviderClient"), goplugin.NewServiceClient(b, cc), ocr3.NewReportingPluginFactoryClient(cc)}
b = b.WithName("OCR3ReportingPluginProviderClient")
return &reportingPluginFactoryClient{b, goplugin.NewServiceClient(b, cc), ocr3.NewReportingPluginFactoryClient(cc)}
}

func (r *reportingPluginFactoryClient) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
Expand Down
Loading

0 comments on commit cfd95c0

Please sign in to comment.