Skip to content

Commit

Permalink
sketch of generic heartbeat service
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickhuie19 committed Dec 17, 2024
1 parent bbe318c commit 2593009
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
54 changes: 54 additions & 0 deletions pkg/loop/server.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package loop

import (
"context"
"fmt"
"os"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

const HeartbeatSeconds = 1

// NewStartedServer returns a started Server.
// The caller is responsible for calling Server.Stop().
func NewStartedServer(loggerName string) (*Server, error) {
Expand Down Expand Up @@ -48,6 +54,7 @@ type Server struct {
Logger logger.SugaredLogger
promServer *PromServer
checker *services.HealthChecker
heartbeat *services.Heartbeat
}

func newServer(loggerName string) (*Server, error) {
Expand All @@ -62,6 +69,48 @@ func newServer(loggerName string) (*Server, error) {
}
lggr = logger.Named(lggr, loggerName)
s.Logger = logger.Sugared(lggr)

var gauge metric.Int64Gauge
var count metric.Int64Counter
var cme custmsg.Labeler

heartbeat := services.NewHeartbeat(
s.Logger,
HeartbeatSeconds*time.Second,
func(ctx context.Context) error {
// Setup beholder resources
gauge, err = beholder.GetMeter().Int64Gauge("heartbeat")
if err != nil {
return err
}
count, err = beholder.GetMeter().Int64Counter("heartbeat_count")
if err != nil {
return err
}

cme = custmsg.NewLabeler()
return nil
},
func(engCtx context.Context) {
// TODO allow override of tracer provider into engine for beholder
_, innerSpan := beholder.GetTracer().Start(engCtx, "heartbeat.beat")
defer innerSpan.End()

gauge.Record(engCtx, 1)
count.Add(engCtx, 1)

err = cme.Emit(engCtx, "heartbeat")
if err != nil {
// TODO this is the server logger, not the engine logger
s.Logger.Errorw("heartbeat emit failed", "err", err)
}
},
func() error {
return nil
},
)
s.heartbeat = &heartbeat

return s, nil
}

Expand Down Expand Up @@ -132,6 +181,10 @@ func (s *Server) start() error {
return fmt.Errorf("error starting health checker: %w", err)
}

if err := s.heartbeat.Start(context.TODO()); err != nil {
return fmt.Errorf("error starting heartbeat: %w", err)
}

return nil
}

Expand All @@ -151,4 +204,5 @@ func (s *Server) Stop() {
if err := s.Logger.Sync(); err != nil {
fmt.Println("Failed to sync logger:", err)
}
s.heartbeat.Close()
}
59 changes: 59 additions & 0 deletions pkg/services/heartbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package services

import (
"context"
"fmt"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
)

// Heartbeat is a usage of Engine for application specific heartbeats,
// used in the core node and for loops. It accepts a named logger,
// a beat, a setup func to initialize resources used on each beat,
// a beat function to define the behavior on each beat, and a close func
// for resource teardown
type Heartbeat struct {
Service
eng *Engine

beat time.Duration
lggr logger.Logger
}

func NewHeartbeat(
lggr logger.Logger,
beat time.Duration,
setupFn func(ctx context.Context) error,
beatFn func(bCtx context.Context),
closeFn func() error,
) Heartbeat {
h := Heartbeat{
beat: beat,
lggr: lggr,
}
startFn := func(ctx context.Context) error {
err := setupFn(ctx)
if err != nil {
return fmt.Errorf("setting up heartbeat: %w", err)
}

// consistent tick period
constantTickFn := func() time.Duration {
return h.beat
}

// TODO allow for override of tracer provider in engine
// TODO wrap beatFn in engine trace
h.eng.GoTick(timeutil.NewTicker(constantTickFn), beatFn)
return nil
}

h.Service, h.eng = Config{
Name: fmt.Sprintf("%s.%s", lggr.Name(), "heartbeat"),
Start: startFn,
Close: closeFn,
}.NewServiceEngine(lggr)
return h
}

0 comments on commit 2593009

Please sign in to comment.