Skip to content

Commit

Permalink
Enqueue heartbeats in priority queue for checking
Browse files Browse the repository at this point in the history
Enqueues, or re-enqueues heartbeats from nodes as they arrive into a
priority queue.
  • Loading branch information
rossjones committed Mar 26, 2024
1 parent f2a7d5a commit c0bdeca
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/compute/executor_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *ExecutorBuffer) Run(ctx context.Context, localExecutionState store.Loca
execution.AllocateResources(execution.Job.Task().Name, *added)
}

s.queuedTasks.Enqueue(newBufferTask(localExecutionState), execution.Job.Priority)
s.queuedTasks.Enqueue(newBufferTask(localExecutionState), int64(execution.Job.Priority))
s.deque()
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/lib/collections/hashed_priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (q *HashedPriorityQueue[K, T]) Contains(id K) bool {

// Enqueue will add the item specified by `data` to the queue with the
// the priority given by `priority`.
func (q *HashedPriorityQueue[K, T]) Enqueue(data T, priority int) {
func (q *HashedPriorityQueue[K, T]) Enqueue(data T, priority int64) {
q.mu.Lock()
defer q.mu.Unlock()

Expand Down
10 changes: 5 additions & 5 deletions pkg/lib/collections/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
type PriorityQueueInterface[T any] interface {
// Enqueue will add the item specified by `data` to the queue with the
// the priority given by `priority`.
Enqueue(data T, priority int)
Enqueue(data T, priority int64)

// Dequeue returns the next highest priority item, returning both
// the data Enqueued previously, and the priority with which it was
Expand Down Expand Up @@ -55,7 +55,7 @@ type PriorityQueue[T any] struct {
// the various dequeue methods
type QueueItem[T any] struct {
Value T
Priority int
Priority int64
}

// MatchingFunction can be used when 'iterating' the priority queue to find
Expand All @@ -73,7 +73,7 @@ func NewPriorityQueue[T any]() *PriorityQueue[T] {

// Enqueue will add the item specified by `data` to the queue with the
// the priority given by `priority`.
func (pq *PriorityQueue[T]) Enqueue(data T, priority int) {
func (pq *PriorityQueue[T]) Enqueue(data T, priority int64) {
pq.mu.Lock()
defer pq.mu.Unlock()

Expand All @@ -82,7 +82,7 @@ func (pq *PriorityQueue[T]) Enqueue(data T, priority int) {

// enqueue is a lock-free version of Enqueue for internal use when a
// method already has a lock.
func (pq *PriorityQueue[T]) enqueue(data T, priority int) {
func (pq *PriorityQueue[T]) enqueue(data T, priority int64) {
heap.Push(
&pq.internalQueue,
&heapItem{
Expand Down Expand Up @@ -180,7 +180,7 @@ type queueHeap []*heapItem

type heapItem struct {
value any
priority int
priority int64
index int // The index for update
}

Expand Down
31 changes: 29 additions & 2 deletions pkg/lib/collections/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestPriorityQueueSuite(t *testing.T) {
func (s *PriorityQueueSuite) TestSimple() {
type testcase struct {
v string
p int
p int64
}
inputs := []testcase{
{"B", 2}, {"A", 3}, {"C", 1}, {"A", 3}, {"C", 1}, {"B", 2},
Expand All @@ -31,7 +31,34 @@ func (s *PriorityQueueSuite) TestSimple() {

pq := collections.NewPriorityQueue[string]()
for _, tc := range inputs {
pq.Enqueue(tc.v, tc.p)
pq.Enqueue(tc.v, int64(tc.p))
}

for _, tc := range expected {
qitem := pq.Dequeue()
s.Require().NotNil(qitem)
s.Require().Equal(tc.v, qitem.Value)
s.Require().Equal(tc.p, qitem.Priority)
}

s.Require().True(pq.IsEmpty())
}

func (s *PriorityQueueSuite) TestSimpleMin() {
type testcase struct {
v string
p int64
}
inputs := []testcase{
{"B", -2}, {"A", -3}, {"C", -1}, {"A", -3}, {"C", -1}, {"B", -2},
}
expected := []testcase{
{"C", -1}, {"C", -1}, {"B", -2}, {"B", -2}, {"A", -3}, {"A", -3},
}

pq := collections.NewPriorityQueue[string]()
for _, tc := range inputs {
pq.Enqueue(tc.v, int64(tc.p))
}

for _, tc := range expected {
Expand Down
46 changes: 44 additions & 2 deletions pkg/node/heartbeat/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package heartbeat

import (
"context"
"time"

"github.com/bacalhau-project/bacalhau/pkg/lib/collections"
natsPubSub "github.com/bacalhau-project/bacalhau/pkg/nats/pubsub"
"github.com/bacalhau-project/bacalhau/pkg/pubsub"

Expand All @@ -12,6 +14,7 @@ import (

type HeartbeatServer struct {
subscription *natsPubSub.PubSub[Heartbeat]
pqueue *collections.HashedPriorityQueue[string, Heartbeat]
}

func NewServer(conn *nats.Conn) (*HeartbeatServer, error) {
Expand All @@ -25,7 +28,13 @@ func NewServer(conn *nats.Conn) (*HeartbeatServer, error) {
return nil, err
}

return &HeartbeatServer{subscription: subscription}, nil
pqueue := collections.NewHashedPriorityQueue[string, Heartbeat](
func(h Heartbeat) string {
return h.NodeID
},
)

return &HeartbeatServer{subscription: subscription, pqueue: pqueue}, nil
}

func (h *HeartbeatServer) Start(ctx context.Context) error {
Expand All @@ -40,13 +49,46 @@ func (h *HeartbeatServer) Start(ctx context.Context) error {
log.Ctx(ctx).Info().Msg("Heartbeat server shutdown")
}(ctx)

go func(ctx context.Context) {
ticker := time.NewTicker(heartbeatQueueCheckFrequency)

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// We'll iterate over the priority queue and check if the last heartbeat
// received from each node is older than 10 seconds. If it is, we'll
// consider the node as dead.
}
}
}(ctx)

return nil
}

func (h *HeartbeatServer) Handle(ctx context.Context, message Heartbeat) error {
log.Ctx(ctx).Trace().Msgf("heartbeat received from %s", message.NodeID)

// TODO: Process the heartbeat (e.g. update the node's last seen time)
timestamp := time.Now().UTC().Unix()

if h.pqueue.Contains(message.NodeID) {
// If we think we already have a heartbeat from this node, we'll update the
// timestamp of the entry so it is re-prioritized in the queue by dequeuing
// and re-enqueuing it (this will ensure it is heapified correctly).
result := h.pqueue.DequeueWhere(func(item Heartbeat) bool {
return item.NodeID == message.NodeID
})

log.Ctx(ctx).Trace().Msgf("Re-enqueueing heartbeat from %s", message.NodeID)
h.pqueue.Enqueue(result.Value, timestamp)
} else {
log.Ctx(ctx).Trace().Msgf("Enqueueing heartbeat from %s", message.NodeID)

// We'll enqueue the heartbeat message with the current timestamp. The older
// the entry, the lower the timestamp (trending to 0) and the higher the priority.
h.pqueue.Enqueue(message, timestamp)
}

return nil
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/node/heartbeat/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package heartbeat

const heartbeatTopic = "heartbeat"
import "time"

const (
heartbeatTopic = "heartbeat"
heartbeatQueueCheckFrequency = 5 * time.Second
)

// Heartbeat represents a heartbeat message from a specific node.
// It contains the node ID and the sequence number of the heartbeat
Expand Down

0 comments on commit c0bdeca

Please sign in to comment.