diff --git a/pkg/compute/executor_buffer.go b/pkg/compute/executor_buffer.go index c089df54bd..c129a1a401 100644 --- a/pkg/compute/executor_buffer.go +++ b/pkg/compute/executor_buffer.go @@ -120,7 +120,7 @@ func (s *ExecutorBuffer) Run(ctx context.Context, localExecutionState store.Loca execution.AllocateResources(execution.Job.Task().Name, *added) } - s.queuedTasks.Enqueue(newBufferTask(localExecutionState), execution.Job.Priority) + s.queuedTasks.Enqueue(newBufferTask(localExecutionState), int64(execution.Job.Priority)) s.deque() return err } diff --git a/pkg/lib/collections/hashed_priority_queue.go b/pkg/lib/collections/hashed_priority_queue.go index 3e68c3fa0a..e83fb6dbde 100644 --- a/pkg/lib/collections/hashed_priority_queue.go +++ b/pkg/lib/collections/hashed_priority_queue.go @@ -33,7 +33,7 @@ func (q *HashedPriorityQueue[K, T]) Contains(id K) bool { // Enqueue will add the item specified by `data` to the queue with the // the priority given by `priority`. -func (q *HashedPriorityQueue[K, T]) Enqueue(data T, priority int) { +func (q *HashedPriorityQueue[K, T]) Enqueue(data T, priority int64) { q.mu.Lock() defer q.mu.Unlock() diff --git a/pkg/lib/collections/priority_queue.go b/pkg/lib/collections/priority_queue.go index 200c6227c7..72129a414e 100644 --- a/pkg/lib/collections/priority_queue.go +++ b/pkg/lib/collections/priority_queue.go @@ -20,7 +20,7 @@ var ( type PriorityQueueInterface[T any] interface { // Enqueue will add the item specified by `data` to the queue with the // the priority given by `priority`. - Enqueue(data T, priority int) + Enqueue(data T, priority int64) // Dequeue returns the next highest priority item, returning both // the data Enqueued previously, and the priority with which it was @@ -55,7 +55,7 @@ type PriorityQueue[T any] struct { // the various dequeue methods type QueueItem[T any] struct { Value T - Priority int + Priority int64 } // MatchingFunction can be used when 'iterating' the priority queue to find @@ -73,7 +73,7 @@ func NewPriorityQueue[T any]() *PriorityQueue[T] { // Enqueue will add the item specified by `data` to the queue with the // the priority given by `priority`. -func (pq *PriorityQueue[T]) Enqueue(data T, priority int) { +func (pq *PriorityQueue[T]) Enqueue(data T, priority int64) { pq.mu.Lock() defer pq.mu.Unlock() @@ -82,7 +82,7 @@ func (pq *PriorityQueue[T]) Enqueue(data T, priority int) { // enqueue is a lock-free version of Enqueue for internal use when a // method already has a lock. -func (pq *PriorityQueue[T]) enqueue(data T, priority int) { +func (pq *PriorityQueue[T]) enqueue(data T, priority int64) { heap.Push( &pq.internalQueue, &heapItem{ @@ -180,7 +180,7 @@ type queueHeap []*heapItem type heapItem struct { value any - priority int + priority int64 index int // The index for update } diff --git a/pkg/lib/collections/priority_queue_test.go b/pkg/lib/collections/priority_queue_test.go index 1bd9ac8f14..d175cac002 100644 --- a/pkg/lib/collections/priority_queue_test.go +++ b/pkg/lib/collections/priority_queue_test.go @@ -20,7 +20,7 @@ func TestPriorityQueueSuite(t *testing.T) { func (s *PriorityQueueSuite) TestSimple() { type testcase struct { v string - p int + p int64 } inputs := []testcase{ {"B", 2}, {"A", 3}, {"C", 1}, {"A", 3}, {"C", 1}, {"B", 2}, @@ -31,7 +31,34 @@ func (s *PriorityQueueSuite) TestSimple() { pq := collections.NewPriorityQueue[string]() for _, tc := range inputs { - pq.Enqueue(tc.v, tc.p) + pq.Enqueue(tc.v, int64(tc.p)) + } + + for _, tc := range expected { + qitem := pq.Dequeue() + s.Require().NotNil(qitem) + s.Require().Equal(tc.v, qitem.Value) + s.Require().Equal(tc.p, qitem.Priority) + } + + s.Require().True(pq.IsEmpty()) +} + +func (s *PriorityQueueSuite) TestSimpleMin() { + type testcase struct { + v string + p int64 + } + inputs := []testcase{ + {"B", -2}, {"A", -3}, {"C", -1}, {"A", -3}, {"C", -1}, {"B", -2}, + } + expected := []testcase{ + {"C", -1}, {"C", -1}, {"B", -2}, {"B", -2}, {"A", -3}, {"A", -3}, + } + + pq := collections.NewPriorityQueue[string]() + for _, tc := range inputs { + pq.Enqueue(tc.v, int64(tc.p)) } for _, tc := range expected { diff --git a/pkg/node/heartbeat/server.go b/pkg/node/heartbeat/server.go index d10e9eb392..e556b1c4a5 100644 --- a/pkg/node/heartbeat/server.go +++ b/pkg/node/heartbeat/server.go @@ -2,7 +2,9 @@ package heartbeat import ( "context" + "time" + "github.com/bacalhau-project/bacalhau/pkg/lib/collections" natsPubSub "github.com/bacalhau-project/bacalhau/pkg/nats/pubsub" "github.com/bacalhau-project/bacalhau/pkg/pubsub" @@ -12,6 +14,7 @@ import ( type HeartbeatServer struct { subscription *natsPubSub.PubSub[Heartbeat] + pqueue *collections.HashedPriorityQueue[string, Heartbeat] } func NewServer(conn *nats.Conn) (*HeartbeatServer, error) { @@ -25,7 +28,13 @@ func NewServer(conn *nats.Conn) (*HeartbeatServer, error) { return nil, err } - return &HeartbeatServer{subscription: subscription}, nil + pqueue := collections.NewHashedPriorityQueue[string, Heartbeat]( + func(h Heartbeat) string { + return h.NodeID + }, + ) + + return &HeartbeatServer{subscription: subscription, pqueue: pqueue}, nil } func (h *HeartbeatServer) Start(ctx context.Context) error { @@ -40,13 +49,46 @@ func (h *HeartbeatServer) Start(ctx context.Context) error { log.Ctx(ctx).Info().Msg("Heartbeat server shutdown") }(ctx) + go func(ctx context.Context) { + ticker := time.NewTicker(heartbeatQueueCheckFrequency) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // We'll iterate over the priority queue and check if the last heartbeat + // received from each node is older than 10 seconds. If it is, we'll + // consider the node as dead. + } + } + }(ctx) + return nil } func (h *HeartbeatServer) Handle(ctx context.Context, message Heartbeat) error { log.Ctx(ctx).Trace().Msgf("heartbeat received from %s", message.NodeID) - // TODO: Process the heartbeat (e.g. update the node's last seen time) + timestamp := time.Now().UTC().Unix() + + if h.pqueue.Contains(message.NodeID) { + // If we think we already have a heartbeat from this node, we'll update the + // timestamp of the entry so it is re-prioritized in the queue by dequeuing + // and re-enqueuing it (this will ensure it is heapified correctly). + result := h.pqueue.DequeueWhere(func(item Heartbeat) bool { + return item.NodeID == message.NodeID + }) + + log.Ctx(ctx).Trace().Msgf("Re-enqueueing heartbeat from %s", message.NodeID) + h.pqueue.Enqueue(result.Value, timestamp) + } else { + log.Ctx(ctx).Trace().Msgf("Enqueueing heartbeat from %s", message.NodeID) + + // We'll enqueue the heartbeat message with the current timestamp. The older + // the entry, the lower the timestamp (trending to 0) and the higher the priority. + h.pqueue.Enqueue(message, timestamp) + } return nil } diff --git a/pkg/node/heartbeat/types.go b/pkg/node/heartbeat/types.go index 6ded7a8234..678edf0fbd 100644 --- a/pkg/node/heartbeat/types.go +++ b/pkg/node/heartbeat/types.go @@ -1,6 +1,11 @@ package heartbeat -const heartbeatTopic = "heartbeat" +import "time" + +const ( + heartbeatTopic = "heartbeat" + heartbeatQueueCheckFrequency = 5 * time.Second +) // Heartbeat represents a heartbeat message from a specific node. // It contains the node ID and the sequence number of the heartbeat